jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write 
flink streaming data to partition table
URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796904
 
 

 ##########
 File path: 
integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
 ##########
 @@ -17,10 +17,229 @@
 
 package org.apache.carbon.flink;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.log4j.Logger;
+
 /**
  * This class is a wrapper of CarbonWriter in SDK.
  * It not only write data to carbon with CarbonWriter in SDK, also generate 
segment file.
  */
-public abstract class CarbonWriter extends ProxyFileWriter<String> {
+public abstract class CarbonWriter extends ProxyFileWriter<Object[]> {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+
+  public CarbonWriter(final CarbonWriterFactory factory,
+      final String identifier, final CarbonTable table) {
+    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Open writer. " + this.toString());
+    }
+    this.factory = factory;
+    this.identifier = identifier;
+    this.table = table;
+  }
+
+  private final CarbonWriterFactory factory;
+
+  private final String identifier;
+
+  protected final CarbonTable table;
+
+  @Override
+  public CarbonWriterFactory getFactory() {
+    return this.factory;
+  }
+
+  @Override
+  public String getIdentifier() {
+    return this.identifier;
+  }
+
+  /**
+   * @return when there is no data file uploaded, then return 
<code>null</code>.
+   */
+  protected StageInput uploadSegmentDataFiles(final String localPath, final 
String remotePath) {
+    if (!this.table.isHivePartitionTable()) {
+      final File[] files = new File(localPath).listFiles();
+      if (files == null) {
+        return null;
+      }
+      Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
+      for (File file : files) {
+        fileNameMapLength.put(file.getName(), file.length());
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug(
+              "Upload file[" + file.getAbsolutePath() + "] to [" + remotePath 
+ "] start.");
+        }
+        try {
+          
CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+        } catch (CarbonDataWriterException exception) {
+          LOGGER.error(exception.getMessage(), exception);
+          throw exception;
+        }
+        if (LOGGER.isDebugEnabled()) {
+          LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
 
 Review comment:
   ```suggestion
             LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] finished.");
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to