jackylk commented on a change in pull request #3532: [CARBONDATA-3557] Write
flink streaming data to partition table
URL: https://github.com/apache/carbondata/pull/3532#discussion_r361796868
##########
File path:
integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
##########
@@ -17,10 +17,229 @@
package org.apache.carbon.flink;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.log4j.Logger;
+
/**
* This class is a wrapper of CarbonWriter in SDK.
* It not only write data to carbon with CarbonWriter in SDK, also generate
segment file.
*/
-public abstract class CarbonWriter extends ProxyFileWriter<String> {
+public abstract class CarbonWriter extends ProxyFileWriter<Object[]> {
+
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+
+ public CarbonWriter(final CarbonWriterFactory factory,
+ final String identifier, final CarbonTable table) {
+ ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Open writer. " + this.toString());
+ }
+ this.factory = factory;
+ this.identifier = identifier;
+ this.table = table;
+ }
+
+ private final CarbonWriterFactory factory;
+
+ private final String identifier;
+
+ protected final CarbonTable table;
+
+ @Override
+ public CarbonWriterFactory getFactory() {
+ return this.factory;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return this.identifier;
+ }
+
+ /**
+ * @return when there is no data file uploaded, then return
<code>null</code>.
+ */
+ protected StageInput uploadSegmentDataFiles(final String localPath, final
String remotePath) {
+ if (!this.table.isHivePartitionTable()) {
+ final File[] files = new File(localPath).listFiles();
+ if (files == null) {
Review comment:
```suggestion
if (files == null || files.length == 0) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services