jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write 
flink streaming data to partition table
URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796868
 
 

 ##########
 File path: 
integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
 ##########
 @@ -17,10 +17,229 @@
 
 package org.apache.carbon.flink;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.log4j.Logger;
+
 /**
  * This class is a wrapper of CarbonWriter in SDK.
  * It not only write data to carbon with CarbonWriter in SDK, also generate 
segment file.
  */
-public abstract class CarbonWriter extends ProxyFileWriter<String> {
+public abstract class CarbonWriter extends ProxyFileWriter<Object[]> {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+
+  public CarbonWriter(final CarbonWriterFactory factory,
+      final String identifier, final CarbonTable table) {
+    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Open writer. " + this.toString());
+    }
+    this.factory = factory;
+    this.identifier = identifier;
+    this.table = table;
+  }
+
+  private final CarbonWriterFactory factory;
+
+  private final String identifier;
+
+  protected final CarbonTable table;
+
+  @Override
+  public CarbonWriterFactory getFactory() {
+    return this.factory;
+  }
+
+  @Override
+  public String getIdentifier() {
+    return this.identifier;
+  }
+
+  /**
+   * @return when there is no data file uploaded, then return 
<code>null</code>.
+   */
+  protected StageInput uploadSegmentDataFiles(final String localPath, final 
String remotePath) {
+    if (!this.table.isHivePartitionTable()) {
+      final File[] files = new File(localPath).listFiles();
+      if (files == null) {
 
 Review comment:
   ```suggestion
         if (files == null || files.length == 0) {
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to