jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write flink streaming data to partition table URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796868
########## File path: integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java ########## @@ -17,10 +17,229 @@ package org.apache.carbon.flink; +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; + +import org.apache.log4j.Logger; + /** * This class is a wrapper of CarbonWriter in SDK. * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. */ -public abstract class CarbonWriter extends ProxyFileWriter<String> { +public abstract class CarbonWriter extends ProxyFileWriter<Object[]> { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + public CarbonWriter(final CarbonWriterFactory factory, + final String identifier, final CarbonTable table) { + ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); + } + this.factory = factory; + this.identifier = identifier; + this.table = table; + } + + private final CarbonWriterFactory factory; + + private final String identifier; + + protected final CarbonTable table; + + @Override + public CarbonWriterFactory getFactory() { + return this.factory; + } + + @Override + public String getIdentifier() { + return this.identifier; + } + + /** + * @return when there is no data file uploaded, then return <code>null</code>. + */ + protected StageInput uploadSegmentDataFiles(final String localPath, final String remotePath) { + if (!this.table.isHivePartitionTable()) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { Review comment: ```suggestion if (files == null || files.length == 0) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services