This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1696f8d  [CARBONDATA-3557] Support write Flink streaming data to Carbon
1696f8d is described below

commit 1696f8d288a2154ed5211cb7376c080523dfaea5
Author: liuzhi <371684...@qq.com>
AuthorDate: Wed Nov 27 17:44:21 2019 +0800

    [CARBONDATA-3557] Support write Flink streaming data to Carbon
    
    The write process is:
    
    1. For every checkpoint in each Flink task, write data to local file system 
by StreamingFileSink and carbon SDK;
    2. Copy local carbon data file to carbon data store system, such as HDFS, 
S3;
    3. Generate and write metadata file and success file to 
${tablePath}/Metadata/stage folder as a commit;
    
    This closes #3421
---
 .../core/indexstore/UnsafeMemoryDMStore.java       |  10 +-
 .../indexstore/blockletindex/BlockDataMap.java     |  16 +-
 .../core/indexstore/row/DataMapRowImpl.java        |   5 +
 .../carbondata/core/statusmanager/FileFormat.java  |   3 +
 .../carbondata/core/statusmanager/StageInput.java  |  58 +++++
 .../apache/carbondata/core/util/CarbonUtil.java    |   8 +-
 .../carbondata/core/util/path/CarbonTablePath.java |   6 +
 .../hadoop/api/CarbonTableOutputFormat.java        |   8 +-
 integration/flink-proxy/pom.xml                    |  43 ++++
 .../org/apache/carbon/flink/ProxyFileSystem.java   | 126 ++++++++++
 .../carbon/flink/ProxyFileSystemFactory.java       |  43 ++++
 .../org/apache/carbon/flink/ProxyFileWriter.java   |  34 +++
 .../carbon/flink/ProxyFileWriterFactory.java       | 152 ++++++++++++
 .../org/apache/carbon/flink/ProxyRecoverable.java  |  53 ++++
 .../carbon/flink/ProxyRecoverableOutputStream.java | 126 ++++++++++
 .../carbon/flink/ProxyRecoverableSerializer.java   | 174 ++++++++++++++
 .../carbon/flink/ProxyRecoverableWriter.java       |  80 ++++++
 .../org.apache.flink.core.fs.FileSystemFactory     |   1 +
 integration/flink/pom.xml                          | 267 +++++++++++++++++++++
 .../apache/carbon/core/metadata/StageManager.java  | 101 ++++++++
 .../apache/carbon/flink/CarbonLocalProperty.java   |  30 +++
 .../org/apache/carbon/flink/CarbonLocalWriter.java | 233 ++++++++++++++++++
 .../carbon/flink/CarbonLocalWriterFactory.java     | 108 +++++++++
 .../flink/CarbonLocalWriterFactoryBuilder.java     |  53 ++++
 .../org/apache/carbon/flink/CarbonS3Property.java  |  36 +++
 .../org/apache/carbon/flink/CarbonS3Writer.java    | 219 +++++++++++++++++
 .../apache/carbon/flink/CarbonS3WriterFactory.java | 157 ++++++++++++
 .../carbon/flink/CarbonS3WriterFactoryBuilder.java |  54 +++++
 .../java/org/apache/carbon/flink/CarbonWriter.java |  26 ++
 .../apache/carbon/flink/CarbonWriterFactory.java   |  77 ++++++
 .../carbon/flink/CarbonWriterFactoryBuilder.java   |  74 ++++++
 ....apache.carbon.flink.CarbonWriterFactoryBuilder |   2 +
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 124 ++++++++++
 .../scala/org/apache/carbon/flink/TestSource.scala |  52 ++++
 pom.xml                                            |   2 +
 .../carbondata/sdk/file/CarbonWriterBuilder.java   |  14 +-
 36 files changed, 2563 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 5ae72fc..8d9cb57 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -231,10 +231,12 @@ public class UnsafeMemoryDMStore extends 
AbstractMemoryDMStore {
         getUnsafe().putInt(memoryBlock.getBaseObject(),
             memoryBlock.getBaseOffset() + startOffset + 
schema.getBytePosition(), varPosition);
         runningLength += 4;
-        getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, 
memoryBlock.getBaseObject(),
-            memoryBlock.getBaseOffset() + startOffset + varPosition, 
data.length);
-        runningLength += data.length;
-        varPosition += data.length;
+        if (data != null) {
+          getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, 
memoryBlock.getBaseObject(),
+                  memoryBlock.getBaseOffset() + startOffset + varPosition, 
data.length);
+          runningLength += data.length;
+          varPosition += data.length;
+        }
         return varPosition;
       default:
         throw new UnsupportedOperationException(
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 52cd8cc..e83985f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -130,7 +130,7 @@ public class BlockDataMap extends CoarseGrainDataMap
     Path path = new Path(blockletDataMapInfo.getFilePath());
     // store file path only in case of partition table, non transactional 
table and flat folder
     // structure
-    byte[] filePath = null;
+    byte[] filePath;
     boolean isPartitionTable = 
blockletDataMapInfo.getCarbonTable().isHivePartitionTable();
     if (isPartitionTable || 
!blockletDataMapInfo.getCarbonTable().isTransactionalTable() ||
         blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() ||
@@ -139,6 +139,8 @@ public class BlockDataMap extends CoarseGrainDataMap
             blockletDataMapInfo.getCarbonTable().getTablePath())) {
       filePath = 
path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
       isFilePathStored = true;
+    } else {
+      filePath = new byte[0];
     }
     byte[] fileName = 
path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
     byte[] segmentId =
@@ -506,7 +508,7 @@ public class BlockDataMap extends CoarseGrainDataMap
     if (null != summaryRow) {
       summaryRow.setByteArray(fileName, SUMMARY_INDEX_FILE_NAME);
       summaryRow.setByteArray(segmentId, SUMMARY_SEGMENTID);
-      if (null != filePath) {
+      if (filePath.length > 0) {
         summaryRow.setByteArray(filePath, SUMMARY_INDEX_PATH);
       }
       try {
@@ -635,8 +637,14 @@ public class BlockDataMap extends CoarseGrainDataMap
       // dummy value
       return 0;
     } else {
-      return ByteBuffer.wrap(getBlockletRowCountForEachBlock()).getShort(
-          index * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+      final byte[] bytes = getBlockletRowCountForEachBlock();
+      // if the segment data is written in tablepath
+      // then the reuslt of getBlockletRowCountForEachBlock will be empty.
+      if (bytes.length == 0) {
+        return 0;
+      } else {
+        return ByteBuffer.wrap(bytes).getShort(index * 
CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+      }
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index 683d2ef..22eca8a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -41,6 +41,11 @@ public class DataMapRowImpl extends DataMapRow {
 
   @Override
   public int getLengthInBytes(int ordinal) {
+    // if the segment data is written in tablepath
+    // then the data[BlockletDataMapRowIndexes.SUMMARY_INDEX_PATH] will be 
null.
+    if (data[ordinal] == null) {
+      return 0;
+    }
     return ((byte[]) data[ordinal]).length;
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index d68429b..b5916fe 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -46,6 +46,9 @@ public class FileFormat implements Serializable {
   }
 
   public static FileFormat getByOrdinal(int ordinal) {
+    if (ordinal < 0) {
+      throw new IllegalArgumentException("Argument [ordinal] is less than 0.");
+    }
     switch (ordinal) {
       case 0:
         return COLUMNAR_V3;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
new file mode 100644
index 0000000..60f8b32
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import java.util.Map;
+
+public class StageInput {
+
+  /**
+   * the base path of files
+   */
+  private String base;
+
+  /**
+   * the list of (file, length) in this StageInput
+   */
+  private Map<String, Long> files;
+
+  public StageInput() {
+
+  }
+
+  public StageInput(String base, Map<String, Long> files) {
+    this.base = base;
+    this.files = files;
+  }
+
+  public String getBase() {
+    return base;
+  }
+
+  public void setBase(String base) {
+    this.base = base;
+  }
+
+  public Map<String, Long> getFiles() {
+    return files;
+  }
+
+  public void setFiles(Map<String, Long> files) {
+    this.files = files;
+  }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 0b83a838..00d8b2a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2847,8 +2847,12 @@ public final class CarbonUtil {
           .getDataInputStream(localFilePath, bufferSize);
       IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
     } finally {
-      CarbonUtil.closeStream(dataInputStream);
-      CarbonUtil.closeStream(dataOutputStream);
+      try {
+        CarbonUtil.closeStream(dataInputStream);
+        CarbonUtil.closeStream(dataOutputStream);
+      } catch (IOException exception) {
+        LOGGER.error(exception.getMessage(), exception);
+      }
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index b72589d..6f46fb9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -55,6 +55,8 @@ public class CarbonTablePath {
   private static final String STREAMING_DIR = ".streaming";
   private static final String STREAMING_LOG_DIR = "log";
   private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
+  private static final String STAGE_DIR = "stage";
+  public static final String  SUCCESS_FILE_SUBFIX = ".success";
 
   /**
    * This class provides static utility only.
@@ -62,6 +64,10 @@ public class CarbonTablePath {
   private CarbonTablePath() {
   }
 
+  public static String getStageDir(String tablePath) {
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
STAGE_DIR;
+  }
+
   /**
    * The method returns the folder path containing the carbon file.
    *
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index fee2e4a..7700c8e 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -122,6 +122,12 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
 
   private CarbonOutputCommitter committer;
 
+  /**
+   * Output format task id generator. It should generate a unique id for every 
task.
+   * It's may conflict when use System.nonaTime() as task id.
+   */
+  private static final AtomicLong DEFAULT_TASK_NO = new AtomicLong(0);
+
   public static void setDatabaseName(Configuration configuration, String 
databaseName) {
     if (null != databaseName) {
       configuration.set(DATABASE_NAME, databaseName);
@@ -255,7 +261,7 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
     }
     if (null == loadModel.getTaskNo() || loadModel.getTaskNo().isEmpty()) {
       loadModel.setTaskNo(taskAttemptContext.getConfiguration()
-          .get("carbon.outputformat.taskno", 
String.valueOf(System.nanoTime())));
+          .get("carbon.outputformat.taskno", 
String.valueOf(DEFAULT_TASK_NO.getAndIncrement())));
     }
     loadModel.setDataWritePath(
         
taskAttemptContext.getConfiguration().get("carbon.outputformat.writepath"));
diff --git a/integration/flink-proxy/pom.xml b/integration/flink-proxy/pom.xml
new file mode 100644
index 0000000..27af507
--- /dev/null
+++ b/integration/flink-proxy/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.carbondata</groupId>
+        <artifactId>carbondata-parent</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>carbondata-flink-proxy</artifactId>
+    <name>Apache CarbonData :: Flink Proxy</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_2.11</artifactId>
+            <version>1.8.0</version>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <dev.path>${basedir}/../../dev</dev.path>
+    </properties>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java
new file mode 100644
index 0000000..8c557a9
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.net.URI;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+/**
+ * Flink need user provides file output stream when use StreamingFileSink,
+ * but CarbonWriter encapsulate the file output stream inside the SDK,
+ * so, we need a proxy file output stream to connect StreamingFileSink and 
CarbonWriter.
+ */
+public final class ProxyFileSystem extends FileSystem {
+
+  public static final String SCHEMA = "proxy";
+
+  public static final URI DEFAULT_URI = URI.create(SCHEMA + ":/");
+
+  public static final ProxyFileSystem INSTANCE = new ProxyFileSystem();
+
+  private ProxyFileSystem() {
+    // private constructor.
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Path getHomeDirectory() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public URI getUri() {
+    return DEFAULT_URI;
+  }
+
+  @Override
+  public FileStatus getFileStatus(final Path path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BlockLocation[] getFileBlockLocations(
+          final FileStatus fileStatus,
+          final long offset,
+          final long length
+  ) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FSDataInputStream open(final Path path, final int i) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FSDataInputStream open(final Path path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FileStatus[] listStatus(final Path path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean delete(final Path path, final boolean b) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean mkdirs(final Path path) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public FSDataOutputStream create(final Path path, final WriteMode writeMode) 
{
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ProxyRecoverableWriter createRecoverableWriter() {
+    return new ProxyRecoverableWriter();
+  }
+
+  @Override
+  public boolean rename(final Path source, final Path target) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean isDistributedFS() {
+    return false;
+  }
+
+  @Override
+  public FileSystemKind getKind() {
+    return FileSystemKind.FILE_SYSTEM;
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java
new file mode 100644
index 0000000..1adb44c
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+public final class ProxyFileSystemFactory implements FileSystemFactory {
+
+  @Override
+  public String getScheme() {
+    return ProxyFileSystem.SCHEMA;
+  }
+
+  @Override
+  public void configure(final Configuration configuration) {
+    // to do nothing.
+  }
+
+  @Override
+  public ProxyFileSystem create(final URI uri) throws IOException {
+    return ProxyFileSystem.INSTANCE;
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
new file mode 100644
index 0000000..f8ef039
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+public abstract class ProxyFileWriter<OUT> implements BulkWriter<OUT> {
+
+  public abstract ProxyFileWriterFactory getFactory();
+
+  public abstract String getPartition();
+
+  public abstract void commit() throws IOException;
+
+  public abstract void close() throws IOException;
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
new file mode 100644
index 0000000..bb683a3
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+public abstract class ProxyFileWriterFactory<OUT> implements 
BulkWriter.Factory<OUT> {
+
+  private static final long serialVersionUID = -1449889091046572219L;
+
+  private static final Map<String, Class<? extends ProxyFileWriterFactory>> 
FACTORY_MAP =
+      new ConcurrentHashMap<>();
+
+  public static ProxyFileWriterFactory newInstance(final String factoryType) {
+    if (factoryType == null) {
+      throw new IllegalArgumentException("Argument [factoryType] is null.");
+    }
+    final Class<? extends ProxyFileWriterFactory> factoryClass = 
FACTORY_MAP.get(factoryType);
+    if (factoryClass == null) {
+      return null;
+    }
+    try {
+      return factoryClass.newInstance();
+    } catch (InstantiationException | IllegalAccessException exception) {
+      throw new RuntimeException(exception);
+    }
+  }
+
+  public static void register(
+        final String factoryType,
+        final Class<? extends ProxyFileWriterFactory> factoryClass
+  ) {
+    if (factoryType == null) {
+      throw new IllegalArgumentException("Argument [factoryType] is null.");
+    }
+    if (factoryClass == null) {
+      throw new IllegalArgumentException("Argument [factoryClass] is null.");
+    }
+    // TODO 检查参数
+    // TODO 检查是否已被注册,重复注册,直接忽略
+    FACTORY_MAP.put(factoryType, factoryClass);
+  }
+
+  private Configuration configuration;
+
+  public abstract String getType();
+
+  public Configuration getConfiguration() {
+    return this.configuration;
+  }
+
+  public void setConfiguration(final Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  public abstract ProxyFileWriter<OUT> create(String partition) throws 
IOException;
+
+  public static class Configuration implements Serializable {
+
+    private static final long serialVersionUID = 8615149992583690295L;
+
+    public Configuration(
+        final String databaseName,
+        final String tableName,
+        final String tablePath,
+        final Properties tableProperties,
+        final Properties writerProperties,
+        final Properties carbonProperties
+    ) {
+      if (tableName == null) {
+        throw new IllegalArgumentException("Argument [tableName] is null.");
+      }
+      if (tablePath == null) {
+        throw new IllegalArgumentException("Argument [tablePath] is null.");
+      }
+      if (tableProperties == null) {
+        throw new IllegalArgumentException("Argument [tableProperties] is 
null.");
+      }
+      if (writerProperties == null) {
+        throw new IllegalArgumentException("Argument [writerProperties] is 
null.");
+      }
+      if (carbonProperties == null) {
+        throw new IllegalArgumentException("Argument [carbonProperties] is 
null.");
+      }
+      this.databaseName = databaseName;
+      this.tableName = tableName;
+      this.tablePath = tablePath;
+      this.tableProperties = tableProperties;
+      this.writerProperties = writerProperties;
+      this.carbonProperties = carbonProperties;
+    }
+
+    private final String databaseName;
+
+    private final String tableName;
+
+    private final String tablePath;
+
+    private final Properties tableProperties;
+
+    private final Properties writerProperties;
+
+    private final Properties carbonProperties;
+
+    public String getDatabaseName() {
+      return this.databaseName;
+    }
+
+    public String getTableName() {
+      return this.tableName;
+    }
+
+    public String getTablePath() {
+      return this.tablePath;
+    }
+
+    public Properties getTableProperties() {
+      return this.tableProperties;
+    }
+
+    public Properties getWriterProperties() {
+      return this.writerProperties;
+    }
+
+    public Properties getCarbonProperties() {
+      return this.carbonProperties;
+    }
+
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
new file mode 100644
index 0000000..11e5d5e
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+public final class ProxyRecoverable
+        implements RecoverableWriter.CommitRecoverable, 
RecoverableWriter.ResumeRecoverable {
+
+  public ProxyRecoverable(
+      final String writerType,
+      final ProxyFileWriterFactory.Configuration writerConfiguration,
+      final String partition
+  ) {
+    this.writerType = writerType;
+    this.writerConfiguration = writerConfiguration;
+    this.partition = partition;
+  }
+
+  private final String writerType;
+
+  private final ProxyFileWriterFactory.Configuration writerConfiguration;
+
+  private final String partition;
+
+  public String getWriterType() {
+    return this.writerType;
+  }
+
+  public ProxyFileWriterFactory.Configuration getWriterConfiguration() {
+    return this.writerConfiguration;
+  }
+
+  public String getPartition() {
+    return this.partition;
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
new file mode 100644
index 0000000..1e59209
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+public final class ProxyRecoverableOutputStream extends 
RecoverableFsDataOutputStream {
+
+  ProxyRecoverableOutputStream() {
+    // protected constructor.
+  }
+
+  private ProxyFileWriter<?> writer;
+
+  public void bind(final ProxyFileWriter<?> writer) {
+    if (writer == null) {
+      throw new IllegalArgumentException("Argument [writer] is null.");
+    }
+    if (this.writer != null) {
+      throw new IllegalStateException("The writer was bound.");
+    }
+    this.writer = writer;
+  }
+
+  @Override
+  public long getPos() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RecoverableWriter.ResumeRecoverable persist() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(final int aByte) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void flush() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void sync() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IOException {
+    // TODO streaming结束的时候和出异常的时候都会调用该方法
+    if (this.writer != null) {
+      this.writer.close();
+    }
+  }
+
+  @Override
+  public Committer closeForCommit() {
+    if (this.writer == null) {
+      throw new IllegalStateException("The writer was not bound.");
+    }
+    return new Committer(
+        new ProxyRecoverable(
+            this.writer.getFactory().getType(),
+            this.writer.getFactory().getConfiguration(),
+            this.writer.getPartition()
+        )
+    );
+  }
+
+  static final class Committer implements 
RecoverableFsDataOutputStream.Committer {
+
+    Committer(final ProxyRecoverable recoverable) {
+      this.recoverable = recoverable;
+    }
+
+    private final ProxyRecoverable recoverable;
+
+    @Override
+    public void commit() throws IOException {
+      this.newWriter().commit();
+    }
+
+    @Override
+    public void commitAfterRecovery() throws IOException {
+      // to do nothing.
+    }
+
+    @Override
+    public RecoverableWriter.CommitRecoverable getRecoverable() {
+      return this.recoverable;
+    }
+
+    private ProxyFileWriter<?> newWriter() throws IOException {
+      final ProxyFileWriterFactory writerFactory =
+          ProxyFileWriterFactory.newInstance(this.recoverable.getWriterType());
+      if (writerFactory == null) {
+        // TODO
+        throw new UnsupportedOperationException();
+      }
+      
writerFactory.setConfiguration(this.recoverable.getWriterConfiguration());
+      return writerFactory.create(this.recoverable.getPartition());
+    }
+
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
new file mode 100644
index 0000000..8bf80bd
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+public final class ProxyRecoverableSerializer
+        implements SimpleVersionedSerializer<ProxyRecoverable> {
+
+  public static final ProxyRecoverableSerializer INSTANCE = new 
ProxyRecoverableSerializer();
+
+  public static final int VERSION = 1;
+
+  private static final byte TRUE = 0;
+
+  private static final byte FALSE = 1;
+
+  private static final String CHARSET = "UTF-8";
+
+  // TODO: make it configurable
+  private static final int BUFFER_SIZE = 10240;
+
+  private ProxyRecoverableSerializer() {
+    // private constructor.
+  }
+
+  @Override
+  public int getVersion() {
+    return VERSION;
+  }
+
+  @Override
+  public byte[] serialize(final ProxyRecoverable proxyRecoverable) {
+    final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
+    serializeString(byteBuffer, proxyRecoverable.getWriterType());
+    serializeConfiguration(byteBuffer, 
proxyRecoverable.getWriterConfiguration());
+    serializeString(byteBuffer, proxyRecoverable.getPartition());
+    final byte[] bytes = new byte[byteBuffer.position()];
+    byteBuffer.position(0);
+    byteBuffer.get(bytes);
+    return bytes;
+  }
+
+  private static void serializeConfiguration(
+      final ByteBuffer byteBuffer,
+      final ProxyFileWriterFactory.Configuration configuration
+  ) {
+    serializeString(byteBuffer, configuration.getDatabaseName());
+    serializeString(byteBuffer, configuration.getTableName());
+    serializeString(byteBuffer, configuration.getTablePath());
+    serializeProperties(byteBuffer, configuration.getTableProperties());
+    serializeProperties(byteBuffer, configuration.getWriterProperties());
+    serializeProperties(byteBuffer, configuration.getCarbonProperties());
+  }
+
+  private static void serializeString(final ByteBuffer byteBuffer, final 
String string) {
+    if (string == null) {
+      byteBuffer.put(TRUE);
+    } else {
+      byteBuffer.put(FALSE);
+      final byte[] stringBytes;
+      try {
+        stringBytes = string.getBytes(CHARSET);
+      } catch (UnsupportedEncodingException exception) {
+        throw new RuntimeException(exception);
+      }
+      byteBuffer.putInt(stringBytes.length);
+      byteBuffer.put(stringBytes);
+    }
+  }
+
+  private static void serializeProperties(
+        final ByteBuffer byteBuffer,
+        final Properties properties
+  ) {
+    if (properties == null) {
+      byteBuffer.put(TRUE);
+    } else {
+      byteBuffer.put(FALSE);
+      byteBuffer.putInt(properties.size());
+      for (String propertyName : properties.stringPropertyNames()) {
+        serializeString(byteBuffer, propertyName);
+        serializeString(byteBuffer, properties.getProperty(propertyName));
+      }
+    }
+  }
+
+  @Override
+  public ProxyRecoverable deserialize(final int version, final byte[] bytes) {
+    if (version != VERSION) {
+      throw new UnsupportedOperationException("Unsupported version: " + 
version + ".");
+    }
+    final ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    final String writerType = deserializeString(byteBuffer);
+    final ProxyFileWriterFactory.Configuration writerConfiguration =
+        deserializeConfiguration(byteBuffer);
+    final String partition = deserializeString(byteBuffer);
+    return new ProxyRecoverable(writerType, writerConfiguration, partition);
+  }
+
+  private static ProxyFileWriterFactory.Configuration deserializeConfiguration(
+        final ByteBuffer byteBuffer
+  ) {
+    final String databaseName = deserializeString(byteBuffer);
+    final String tableName = deserializeString(byteBuffer);
+    final String tablePath = deserializeString(byteBuffer);
+    final Properties tableProperties = deserializeProperties(byteBuffer);
+    final Properties writerProperties = deserializeProperties(byteBuffer);
+    final Properties carbonProperties = deserializeProperties(byteBuffer);
+    return new ProxyFileWriterFactory.Configuration(
+        databaseName,
+        tableName,
+        tablePath,
+        tableProperties,
+        writerProperties,
+        carbonProperties
+    );
+  }
+
+  private static String deserializeString(final ByteBuffer byteBuffer) {
+    switch (byteBuffer.get()) {
+      case TRUE:
+        return null;
+      case FALSE:
+        final int stringByteLength = byteBuffer.getInt();
+        final byte[] stringBytes = new byte[stringByteLength];
+        byteBuffer.get(stringBytes);
+        try {
+          return new String(stringBytes, CHARSET);
+        } catch (UnsupportedEncodingException exception) {
+          throw new RuntimeException(exception);
+        }
+      default:
+        throw new RuntimeException();
+    }
+  }
+
+  @SuppressWarnings("ConstantConditions")
+  private static Properties deserializeProperties(final ByteBuffer byteBuffer) 
{
+    switch (byteBuffer.get()) {
+      case TRUE:
+        return null;
+      case FALSE:
+        final int propertyCount = byteBuffer.getInt();
+        final Properties properties = new Properties();
+        for (int index = 0; index < propertyCount; index++) {
+          properties.put(deserializeString(byteBuffer), 
deserializeString(byteBuffer));
+        }
+        return properties;
+      default:
+        throw new RuntimeException();
+    }
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java
 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java
new file mode 100644
index 0000000..3713ac2
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+public final class ProxyRecoverableWriter implements RecoverableWriter {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public SimpleVersionedSerializer<CommitRecoverable> 
getCommitRecoverableSerializer() {
+    final SimpleVersionedSerializer<? extends CommitRecoverable> serializer =
+        ProxyRecoverableSerializer.INSTANCE;
+    return (SimpleVersionedSerializer<CommitRecoverable>) serializer;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public SimpleVersionedSerializer<ResumeRecoverable> 
getResumeRecoverableSerializer() {
+    final SimpleVersionedSerializer<? extends ResumeRecoverable> serializer =
+        ProxyRecoverableSerializer.INSTANCE;
+    return (SimpleVersionedSerializer<ResumeRecoverable>) serializer;
+  }
+
+  @Override
+  public ProxyRecoverableOutputStream open(final Path path) {
+    return new ProxyRecoverableOutputStream();
+  }
+
+  @Override
+  public RecoverableFsDataOutputStream recover(final ResumeRecoverable 
resumeRecoverable) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean requiresCleanupOfRecoverableState() {
+    return false;
+  }
+
+  @Override
+  public boolean cleanupRecoverableState(final ResumeRecoverable 
resumeRecoverable) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ProxyRecoverableOutputStream.Committer recoverForCommit(
+        final CommitRecoverable commitRecoverable
+  ) {
+    if (!(commitRecoverable instanceof ProxyRecoverable)) {
+      throw new IllegalArgumentException(
+          "ProxyFileSystem can not recover recoverable for other file system. 
" + commitRecoverable
+      );
+    }
+    return new ProxyRecoverableOutputStream.Committer((ProxyRecoverable) 
commitRecoverable);
+  }
+
+  @Override
+  public boolean supportsResume() {
+    return false;
+  }
+
+}
diff --git 
a/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 
b/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..f8a0ed8
--- /dev/null
+++ 
b/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1 @@
+org.apache.carbon.flink.ProxyFileSystemFactory
\ No newline at end of file
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
new file mode 100644
index 0000000..80746c0
--- /dev/null
+++ b/integration/flink/pom.xml
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.carbondata</groupId>
+        <artifactId>carbondata-parent</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>carbondata-flink</artifactId>
+    <name>Apache CarbonData :: Flink</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <dev.path>${basedir}/../../dev</dev.path>
+        <hadoop.version>2.7.5</hadoop.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-flink-proxy</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-format</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-processing</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.huaweicloud</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-hadoop</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.huaweicloud</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-store-sdk</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.carbondata</groupId>
+                    <artifactId>carbondata-streaming</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.huaweicloud</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.5</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_2.11</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>2.7.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.1.17.Final</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <profile>
+            <id>spark-2.2</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.carbondata</groupId>
+                    <artifactId>carbondata-spark2</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>org.apache.hive</groupId>
+                            <artifactId>hive-exec</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>spark-2.3</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.carbondata</groupId>
+                    <artifactId>carbondata-spark2</artifactId>
+                    <version>${project.version}</version>
+                    <scope>test</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>org.apache.hive</groupId>
+                            <artifactId>hive-exec</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>2.15.2</version>
+                <executions>
+                    <execution>
+                        <id>compile</id>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>compile</phase>
+                    </execution>
+                    <execution>
+                        <id>testCompile</id>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                        <phase>test</phase>
+                    </execution>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                        <phase>process-resources</phase>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
 
b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
new file mode 100644
index 0000000..ad38260
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.core.metadata;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+public final class StageManager {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(StageManager.class.getName());
+
+  public static void writeStageInput(final String stageInputPath, final 
StageInput stageInput)
+          throws IOException {
+    AtomicFileOperations fileWrite =
+        AtomicFileOperationFactory.getAtomicFileOperations(stageInputPath);
+    BufferedWriter writer = null;
+    DataOutputStream dataOutputStream = null;
+    try {
+      dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+      writer = new BufferedWriter(new OutputStreamWriter(dataOutputStream, 
StandardCharsets.UTF_8));
+      String metadataInstance = new Gson().toJson(stageInput);
+      writer.write(metadataInstance);
+    } catch (IOException e) {
+      LOGGER.error("Error message: " + e.getLocalizedMessage());
+      fileWrite.setFailed();
+      throw e;
+    } finally {
+      if (null != writer) {
+        writer.flush();
+      }
+      CarbonUtil.closeStreams(writer);
+      fileWrite.close();
+    }
+
+    try {
+      writeSuccessFile(stageInputPath + CarbonTablePath.SUCCESS_FILE_SUBFIX);
+    } catch (Throwable exception) {
+      try {
+        
CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(stageInputPath));
+      } catch (Throwable e) {
+        LOGGER.error("Fail to delete stage input meta data [" + stageInputPath 
+ "].", exception);
+      }
+      throw exception;
+    }
+  }
+
+  private static void writeSuccessFile(final String successFilePath) throws 
IOException {
+    final DataOutputStream segmentStatusSuccessOutputStream =
+        FileFactory.getDataOutputStream(
+            successFilePath, FileFactory.getFileType(successFilePath),
+            CarbonCommonConstants.BYTEBUFFER_SIZE, 1024);
+    try {
+      IOUtils.copyBytes(
+          new ByteArrayInputStream(new byte[0]),
+          segmentStatusSuccessOutputStream,
+          CarbonCommonConstants.BYTEBUFFER_SIZE);
+      segmentStatusSuccessOutputStream.flush();
+    } finally {
+      try {
+        CarbonUtil.closeStream(segmentStatusSuccessOutputStream);
+      } catch (IOException exception) {
+        LOGGER.error(exception.getMessage(), exception);
+      }
+    }
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
new file mode 100644
index 0000000..3383e8c
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+public final class CarbonLocalProperty {
+
+  public static final String DATA_TEMP_PATH = 
"carbon.writer.local.data.temp.path";
+
+  public static final String DATA_PATH = "carbon.writer.local.data.path";
+
+  private CarbonLocalProperty() {
+    // private constructor.
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
new file mode 100644
index 0000000..dcfe8b3
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.carbon.core.metadata.StageManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+final class CarbonLocalWriter extends CarbonWriter {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonLocalWriter.class.getName());
+
+  CarbonLocalWriter(
+      final CarbonLocalWriterFactory factory,
+      final CarbonTable table,
+      final org.apache.carbondata.sdk.file.CarbonWriter writer,
+      final String writePath,
+      final String writePartition
+  ) {
+    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Open writer. " + this.toString());
+    }
+    this.factory = factory;
+    this.table = table;
+    this.writer = writer;
+    this.writePath = writePath;
+    this.writePartition = writePartition;
+    this.flushed = true;
+  }
+
+  private final CarbonLocalWriterFactory factory;
+
+  private final CarbonTable table;
+
+  private final org.apache.carbondata.sdk.file.CarbonWriter writer;
+
+  private final String writePath;
+
+  private final String writePartition;
+
+  private volatile boolean flushed;
+
+  @Override
+  public CarbonLocalWriterFactory getFactory() {
+    return this.factory;
+  }
+
+  @Override
+  public String getPartition() {
+    return this.writePartition;
+  }
+
+  @Override
+  public void addElement(final String element) throws IOException {
+    this.writer.write(element);
+    this.flushed = false;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Flush writer. " + this.toString());
+    }
+    synchronized (this) {
+      if (!this.flushed) {
+        this.writer.close();
+        this.flushed = true;
+      }
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Finish writer. " + this.toString());
+    }
+    if (!this.flushed) {
+      this.flush();
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Commit write. " + this.toString());
+    }
+    try {
+      final Properties writerProperties = 
this.factory.getConfiguration().getWriterProperties();
+      String dataPath = 
writerProperties.getProperty(CarbonLocalProperty.DATA_PATH);
+      if (dataPath == null) {
+        throw new IllegalArgumentException(
+                "Writer property [" + CarbonLocalProperty.DATA_PATH + "] is 
not set."
+        );
+      }
+      dataPath = dataPath + this.table.getDatabaseName() + "/"
+          + this.table.getTableName() + "/" + this.writePartition + "/";
+      Map<String, Long> fileList =
+              this.uploadSegmentDataFiles(this.writePath + 
"Fact/Part0/Segment_null/", dataPath);
+      try {
+        String stageDir = CarbonTablePath.getStageDir(
+            table.getAbsoluteTableIdentifier().getTablePath());
+        tryCreateLocalDirectory(new File(stageDir));
+        String stageInputPath = stageDir + "/" + this.writePartition;
+        StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, 
fileList));
+      } catch (Throwable exception) {
+        this.deleteSegmentDataFilesQuietly(dataPath);
+        throw exception;
+      }
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(this.writePath));
+      } catch (IOException exception) {
+        LOGGER.error("Fail to delete write path [" + this.writePath + "].", 
exception);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.writer == null) {
+      return;
+    }
+    try {
+      synchronized (this) {
+        if (!this.flushed) {
+          this.writer.close();
+          this.flushed = true;
+        }
+      }
+    } catch (Throwable exception) {
+      LOGGER.error("Fail to close carbon writer.", exception);
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(this.writePath));
+      } catch (IOException exception) {
+        LOGGER.error("Fail to delete write path [" + this.writePath + "].", 
exception);
+      }
+    }
+  }
+
+  private Map<String, Long> uploadSegmentDataFiles(final String localPath, 
final String remotePath)
+          throws IOException {
+    final File[] files = new File(localPath).listFiles();
+    if (files == null) {
+      return new HashMap<>(0);
+    }
+    Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
+    for (File file : files) {
+      fileNameMapLength.put(file.getName(), file.length());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] start.");
+      }
+      try {
+        final File remoteFile = new File(remotePath + file.getName());
+        if (!remoteFile.exists()) {
+          tryCreateLocalFile(remoteFile);
+        }
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+      } catch (CarbonDataWriterException exception) {
+        LOGGER.error(exception.getMessage(), exception);
+        throw exception;
+      }
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
+      }
+    }
+    return fileNameMapLength;
+  }
+
+  private void deleteSegmentDataFilesQuietly(final String segmentDataPath) {
+    try {
+      
CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(segmentDataPath));
+    } catch (Throwable exception) {
+      LOGGER.error("Fail to delete segment data path [" + segmentDataPath + 
"].", exception);
+    }
+  }
+
+  private static void tryCreateLocalFile(final File file) throws IOException {
+    if (file.exists()) {
+      return;
+    }
+    if (file.getParentFile() != null) {
+      tryCreateLocalDirectory(file.getParentFile());
+    }
+    if (!file.createNewFile()) {
+      throw new IOException("File [" + file.getCanonicalPath() + "] is 
exist.");
+    }
+  }
+
+  private static void tryCreateLocalDirectory(final File file) throws 
IOException {
+    if (file.exists()) {
+      return;
+    }
+    if (file.getParentFile() != null) {
+      tryCreateLocalDirectory(file.getParentFile());
+    }
+    if (!file.mkdir() && LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Directory [" + file.getCanonicalPath() + "] is exist.");
+    }
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
new file mode 100644
index 0000000..4c24a8b
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Schema;
+
+public final class CarbonLocalWriterFactory extends CarbonWriterFactory {
+
+  private static final long serialVersionUID = 2822670807460968078L;
+
+  @Override
+  public String getType() {
+    return CarbonLocalWriterFactoryBuilder.TYPE;
+  }
+
+  @Override
+  protected CarbonLocalWriter create0() throws IOException {
+    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
+    final String writeTempPath = 
writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH);
+    if (writeTempPath == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is 
not set."
+      );
+    }
+    final String writePartition = UUID.randomUUID().toString().replace("-", 
"");
+    final String writePath = writeTempPath + "_" + writePartition + "/";
+    final CarbonTable table = this.getTable();
+    final CarbonTable clonedTable =
+        
CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize()));
+    clonedTable.getTableInfo().setTablePath(writePath);
+    final org.apache.carbondata.sdk.file.CarbonWriter writer;
+    try {
+      writer = CarbonWriter.builder()
+          .outputPath("")
+          .writtenBy("flink")
+          .withTable(clonedTable)
+          .withTableProperties(this.getTableProperties())
+          .withJsonInput(this.getTableSchema(clonedTable))
+          .build();
+    } catch (InvalidLoadOptionException exception) {
+      // TODO
+      throw new UnsupportedOperationException(exception);
+    }
+    return new CarbonLocalWriter(this, table, writer, writePath, 
writePartition);
+  }
+
+  @Override
+  protected CarbonLocalWriter create0(final String partition) throws 
IOException {
+    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
+    final String writeTempPath = 
writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH);
+    if (writeTempPath == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is 
not set."
+      );
+    }
+    final String writePath = writeTempPath + "_" + partition + "/";
+    final CarbonTable table = this.getTable();
+    return new CarbonLocalWriter(this, table, null, writePath, partition);
+  }
+
+  private Schema getTableSchema(final CarbonTable table) {
+    final List<CarbonColumn> columnList = table.getCreateOrderColumn();
+    final List<ColumnSchema> columnSchemaList = new 
ArrayList<>(columnList.size());
+    for (CarbonColumn column : columnList) {
+      columnSchemaList.add(column.getColumnSchema());
+    }
+    return new Schema(columnSchemaList);
+  }
+
+  private Map<String, String> getTableProperties() {
+    final Properties tableProperties = 
this.getConfiguration().getTableProperties();
+    final Map<String, String> tablePropertyMap = new 
HashMap<>(tableProperties.size());
+    for (String propertyName : tableProperties.stringPropertyNames()) {
+      tablePropertyMap.put(propertyName, 
tableProperties.getProperty(propertyName));
+    }
+    return tablePropertyMap;
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java
new file mode 100644
index 0000000..74d1e41
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.util.Properties;
+
+public final class CarbonLocalWriterFactoryBuilder extends 
CarbonWriterFactoryBuilder {
+
+  public static final String TYPE = "Local";
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public CarbonLocalWriterFactory build(
+      final String databaseName,
+      final String tableName,
+      final String tablePath,
+      final Properties tableProperties,
+      final Properties writerProperties,
+      final Properties carbonProperties
+  ) {
+    final CarbonLocalWriterFactory factory = new CarbonLocalWriterFactory();
+    factory.setConfiguration(
+        new ProxyFileWriterFactory.Configuration(
+            databaseName,
+            tableName,
+            tablePath,
+            tableProperties,
+            writerProperties,
+            carbonProperties
+        )
+    );
+    return factory;
+  }
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
new file mode 100644
index 0000000..6d9d94b
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+final class CarbonS3Property {
+
+  static final String ACCESS_KEY = "carbon.writer.s3.access.key";
+
+  static final String SECRET_KEY = "carbon.writer.s3.secret.key";
+
+  static final String ENDPOINT = "carbon.writer.s3.endpoint";
+
+  static final String DATA_TEMP_PATH = "carbon.writer.s3.data.temp.path";
+
+  static final String DATA_PATH = "carbon.writer.s3.data.path";
+
+  private CarbonS3Property() {
+    // private constructor.
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
new file mode 100644
index 0000000..e5aeca4
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.carbon.core.metadata.StageManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+final class CarbonS3Writer extends CarbonWriter {
+
+  private static final Logger LOGGER =
+      LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+
+  CarbonS3Writer(
+      final CarbonS3WriterFactory factory,
+      final CarbonTable table,
+      final org.apache.carbondata.sdk.file.CarbonWriter writer,
+      final String writePath,
+      final String writePartition,
+      final Configuration configuration
+  ) {
+    ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Open writer. " + this.toString());
+    }
+    this.factory = factory;
+    this.table = table;
+    this.writer = writer;
+    this.writePath = writePath;
+    this.writePartition = writePartition;
+    this.configuration = configuration;
+    this.flushed = true;
+  }
+
+  private final CarbonS3WriterFactory factory;
+
+  private final CarbonTable table;
+
+  private final org.apache.carbondata.sdk.file.CarbonWriter writer;
+
+  private final String writePath;
+
+  private final String writePartition;
+
+  private final Configuration configuration;
+
+  private volatile boolean flushed;
+
+  @Override
+  public CarbonS3WriterFactory getFactory() {
+    return this.factory;
+  }
+
+  @Override
+  public String getPartition() {
+    return this.writePartition;
+  }
+
+  @Override
+  public void addElement(final String element) throws IOException {
+    this.writer.write(element);
+    this.flushed = false;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Flush writer. " + this.toString());
+    }
+    synchronized (this) {
+      if (!this.flushed) {
+        this.writer.close();
+        this.flushed = true;
+      }
+    }
+  }
+
+  @Override
+  public void finish() throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Finish writer. " + this.toString());
+    }
+    if (!this.flushed) {
+      this.flush();
+    }
+  }
+
+  @Override
+  public void commit() throws IOException {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Commit write. " + this.toString());
+    }
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(this.configuration);
+    ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
+        .getNonSerializableExtraInfo().put("carbonConf", this.configuration);
+    try {
+      final Properties writerProperties = 
this.factory.getConfiguration().getWriterProperties();
+      String dataPath = 
writerProperties.getProperty(CarbonS3Property.DATA_PATH);
+      if (dataPath == null) {
+        throw new IllegalArgumentException(
+                "Writer property [" + CarbonS3Property.DATA_PATH + "] is not 
set."
+        );
+      }
+      if (!dataPath.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+        throw new IllegalArgumentException(
+                "Writer property [" + CarbonS3Property.DATA_PATH + "] is not a 
s3a path."
+        );
+      }
+      dataPath = dataPath + this.table.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR +
+          this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR +
+          this.writePartition + CarbonCommonConstants.FILE_SEPARATOR;
+      Map<String, Long> fileList =
+          this.uploadSegmentDataFiles(this.writePath + 
"Fact/Part0/Segment_null/", dataPath);
+      try {
+        String stageInputPath = CarbonTablePath.getStageDir(
+            table.getAbsoluteTableIdentifier().getTablePath()) +
+            CarbonCommonConstants.FILE_SEPARATOR + this.writePartition;
+        StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, 
fileList));
+      } catch (Throwable exception) {
+        this.deleteSegmentDataFilesQuietly(dataPath);
+        throw exception;
+      }
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(this.writePath));
+      } catch (IOException exception) {
+        LOGGER.error("Fail to delete write path [" + this.writePath + "].", 
exception);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.writer == null) {
+      return;
+    }
+    try {
+      synchronized (this) {
+        if (!this.flushed) {
+          this.writer.close();
+          this.flushed = true;
+        }
+      }
+    } catch (Throwable exception) {
+      LOGGER.error("Fail to close carbon writer.", exception);
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(this.writePath));
+      } catch (IOException exception) {
+        LOGGER.error("Fail to delete write path [" + this.writePath + "].", 
exception);
+      }
+    }
+  }
+
+  private Map<String, Long> uploadSegmentDataFiles(
+      final String localPath, final String remotePath) {
+    final File[] files = new File(localPath).listFiles();
+    if (files == null) {
+      return new HashMap<>(0);
+    }
+    Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
+    for (File file : files) {
+      fileNameMapLength.put(file.getName(), file.length());
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] start.");
+      }
+      try {
+        CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), 
remotePath, 1024);
+      } catch (CarbonDataWriterException exception) {
+        LOGGER.error(exception.getMessage(), exception);
+        throw exception;
+      }
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + 
remotePath + "] end.");
+      }
+    }
+    return fileNameMapLength;
+  }
+
+  private void deleteSegmentDataFilesQuietly(final String segmentDataPath) {
+    try {
+      
CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(segmentDataPath));
+    } catch (Throwable exception) {
+      LOGGER.error("Fail to delete segment data path [" + segmentDataPath + 
"].", exception);
+    }
+  }
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
new file mode 100644
index 0000000..f1ab483
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Schema;
+
+public final class CarbonS3WriterFactory extends CarbonWriterFactory {
+
+  private static final long serialVersionUID = 2302824357711095245L;
+
+  @Override
+  public String getType() {
+    return CarbonS3WriterFactoryBuilder.TYPE;
+  }
+
+  @Override
+  protected CarbonS3Writer create0() throws IOException {
+    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
+    final String writeTempPath = 
writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH);
+    if (writeTempPath == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is 
not set."
+      );
+    }
+    final String writePartition = UUID.randomUUID().toString().replace("-", 
"");
+    final String writePath = writeTempPath + "_" + writePartition + "/";
+    final CarbonTable table = this.getTable();
+    final CarbonTable clonedTable =
+        
CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize()));
+    clonedTable.getTableInfo().setTablePath(writePath);
+    final org.apache.hadoop.conf.Configuration configuration = 
this.getS3Configuration();
+    final CarbonWriter writer;
+    try {
+      writer = CarbonWriter.builder()
+          .outputPath("")
+          .writtenBy("flink")
+          .withTable(clonedTable)
+          .withTableProperties(this.getTableProperties())
+          .withJsonInput(this.getTableSchema(clonedTable))
+          .withHadoopConf(configuration)
+          .build();
+    } catch (InvalidLoadOptionException exception) {
+      // TODO
+      throw new UnsupportedOperationException(exception);
+    }
+    return new CarbonS3Writer(this, table, writer, writePath, writePartition, 
configuration);
+  }
+
+  @Override
+  protected CarbonS3Writer create0(final String partition) throws IOException {
+    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
+    final String writeTempPath = 
writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH);
+    if (writeTempPath == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is 
not set."
+      );
+    }
+    final String writePath = writeTempPath + "_" + partition + "/";
+    final CarbonTable table = this.getTable();
+    final org.apache.hadoop.conf.Configuration configuration = 
this.getS3Configuration();
+    return new CarbonS3Writer(this, table, null, writePath, partition, 
configuration);
+  }
+
+  @Override
+  protected CarbonTable getTable() throws IOException {
+    this.setS3Configuration(this.getS3Configuration());
+    return super.getTable();
+  }
+
+  private Schema getTableSchema(final CarbonTable table) {
+    final List<CarbonColumn> columnList = table.getCreateOrderColumn();
+    final List<ColumnSchema> columnSchemaList = new 
ArrayList<>(columnList.size());
+    for (CarbonColumn column : columnList) {
+      columnSchemaList.add(column.getColumnSchema());
+    }
+    return new Schema(columnSchemaList);
+  }
+
+  private Map<String, String> getTableProperties() {
+    final Properties tableProperties = 
this.getConfiguration().getTableProperties();
+    final Map<String, String> tablePropertyMap = new 
HashMap<>(tableProperties.size());
+    for (String propertyName : tableProperties.stringPropertyNames()) {
+      tablePropertyMap.put(propertyName, 
tableProperties.getProperty(propertyName));
+    }
+    return tablePropertyMap;
+  }
+
+  private org.apache.hadoop.conf.Configuration getS3Configuration() {
+    final Properties writerProperties = 
this.getConfiguration().getWriterProperties();
+    final String accessKey = 
writerProperties.getProperty(CarbonS3Property.ACCESS_KEY);
+    final String secretKey = 
writerProperties.getProperty(CarbonS3Property.SECRET_KEY);
+    final String endpoint = 
writerProperties.getProperty(CarbonS3Property.ENDPOINT);
+    if (accessKey == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonS3Property.ACCESS_KEY + "] is not 
set."
+      );
+    }
+    if (secretKey == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonS3Property.SECRET_KEY + "] is not 
set."
+      );
+    }
+    if (endpoint == null) {
+      throw new IllegalArgumentException(
+              "Writer property [" + CarbonS3Property.ENDPOINT + "] is not set."
+      );
+    }
+    final org.apache.hadoop.conf.Configuration configuration =
+        new org.apache.hadoop.conf.Configuration(true);
+    configuration.set("fs.s3.access.key", accessKey);
+    configuration.set("fs.s3.secret.key", secretKey);
+    configuration.set("fs.s3.endpoint", endpoint);
+    configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+    configuration.set("spark.hadoop.fs.s3.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+    configuration.set("fs.s3a.access.key", accessKey);
+    configuration.set("fs.s3a.secret.key", secretKey);
+    configuration.set("fs.s3a.endpoint", endpoint);
+    configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+    configuration.set("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+    return configuration;
+  }
+
+  private void setS3Configuration(final org.apache.hadoop.conf.Configuration 
configuration) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java
new file mode 100644
index 0000000..afd9e5e
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.util.Properties;
+
+public final class CarbonS3WriterFactoryBuilder extends 
CarbonWriterFactoryBuilder {
+
+  public static final String TYPE = "S3";
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public CarbonS3WriterFactory build(
+      final String databaseName,
+      final String tableName,
+      final String tablePath,
+      final Properties tableProperties,
+      final Properties writerProperties,
+      final Properties carbonProperties
+  ) {
+    final CarbonS3WriterFactory factory = new CarbonS3WriterFactory();
+    factory.setConfiguration(
+        new ProxyFileWriterFactory.Configuration(
+            databaseName,
+            tableName,
+            tablePath,
+            tableProperties,
+            writerProperties,
+            carbonProperties
+        )
+    );
+    return factory;
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
new file mode 100644
index 0000000..d5ddac5
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+/**
+ * This class is a wrapper of CarbonWriter in SDK.
+ * It not only write data to carbon with CarbonWriter in SDK, also generate 
segment file.
+ */
+public abstract class CarbonWriter extends ProxyFileWriter<String> {
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
new file mode 100644
index 0000000..d3257c9
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+public abstract class CarbonWriterFactory extends 
ProxyFileWriterFactory<String> {
+
+  public static CarbonWriterFactoryBuilder builder(final String type) {
+    return CarbonWriterFactoryBuilder.get(type);
+  }
+
+  @Override
+  public CarbonWriter create(final FSDataOutputStream out) throws IOException {
+    if (!(out instanceof ProxyRecoverableOutputStream)) {
+      throw new IllegalArgumentException(
+              "Only support " + ProxyRecoverableOutputStream.class.getName() + 
"."
+      );
+    }
+    this.setCarbonProperties();
+    final CarbonWriter writer = this.create0();
+    ((ProxyRecoverableOutputStream) out).bind(writer);
+    return writer;
+  }
+
+  @Override
+  public CarbonWriter create(final String partition) throws IOException {
+    this.setCarbonProperties();
+    return this.create0(partition);
+  }
+
+  protected abstract CarbonWriter create0() throws IOException;
+
+  protected abstract CarbonWriter create0(String partition) throws IOException;
+
+  protected CarbonTable getTable() throws IOException {
+    final Configuration configuration = this.getConfiguration();
+    return CarbonTable.buildFromTablePath(
+        configuration.getTableName(),
+        configuration.getDatabaseName(),
+        configuration.getTablePath(),
+        null
+    );
+  }
+
+  private void setCarbonProperties() {
+    final CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    for (String propertyName :
+            
this.getConfiguration().getCarbonProperties().stringPropertyNames()) {
+      carbonProperties.addProperty(
+          propertyName,
+          
this.getConfiguration().getCarbonProperties().getProperty(propertyName)
+      );
+    }
+  }
+
+}
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java
new file mode 100644
index 0000000..28d2225
--- /dev/null
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.log4j.Logger;
+
+public abstract class CarbonWriterFactoryBuilder {
+
+  private static final Logger LOGGER =
+      
LogServiceFactory.getLogService(CarbonWriterFactoryBuilder.class.getName());
+
+  private static final Map<String, CarbonWriterFactoryBuilder> BUILDER_MAP;
+
+  static {
+    final Map<String, CarbonWriterFactoryBuilder> builderMap = new HashMap<>();
+    final ServiceLoader<CarbonWriterFactoryBuilder> builderLoader =
+        ServiceLoader.load(CarbonWriterFactoryBuilder.class);
+    for (CarbonWriterFactoryBuilder builder :builderLoader) {
+      builderMap.put(builder.getType(), builder);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Added carbon writer factory builder. " + 
builder.getClass().getName());
+      }
+    }
+    BUILDER_MAP = Collections.unmodifiableMap(builderMap);
+  }
+
+  public static CarbonWriterFactoryBuilder get(final String type) {
+    if (type == null) {
+      throw new IllegalArgumentException("Argument [type] is null.");
+    }
+    CarbonWriterFactoryBuilder builder = BUILDER_MAP.get(type);
+    if (builder == null) {
+      if (type.equalsIgnoreCase(CarbonS3WriterFactoryBuilder.TYPE)) {
+        return new CarbonS3WriterFactoryBuilder();
+      }
+    }
+    return builder;
+  }
+
+  public abstract String getType();
+
+  public abstract CarbonWriterFactory build(
+      String databaseName,
+      String tableName,
+      String tablePath,
+      Properties tableProperties,
+      Properties writerProperties,
+      Properties carbonProperties
+  );
+
+}
diff --git 
a/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder
 
b/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder
new file mode 100644
index 0000000..5c8666b
--- /dev/null
+++ 
b/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder
@@ -0,0 +1,2 @@
+org.apache.carbon.flink.CarbonLocalWriterFactoryBuilder
+org.apache.carbon.flink.CarbonS3WriterFactoryBuilder
\ No newline at end of file
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
new file mode 100644
index 0000000..55085f8
--- /dev/null
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink
+
+import java.util.Properties
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+
+import org.junit.Test
+
+class TestCarbonWriter extends QueryTest {
+
+  val tableName = "test_flink"
+
+  @Test
+  def testLocal(): Unit = {
+    sql(s"drop table if exists $tableName").collect()
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
+         | STORED AS carbondata
+      """.stripMargin
+    ).collect()
+
+    try {
+      val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+      val dataTempPath = rootPath + "/data/temp/"
+      val dataPath = rootPath + "/data/"
+
+      val tablePath = storeLocation + "/" + tableName + "/"
+
+      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(1)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 10000
+      val source = new TestSource(dataCount) {
+        @throws[InterruptedException]
+        override def get(index: Int): String = {
+          Thread.sleep(1L)
+          "{\"stringField\": \"test" + index + "\", \"intField\": " + index + 
", \"shortField\": 12345}"
+        }
+
+        @throws[InterruptedException]
+        override def onFinish(): Unit = {
+          Thread.sleep(10000L)
+        }
+      }
+      val stream = environment.addSource(source)
+      val factory = CarbonWriterFactory.builder("Local").build(
+        "default",
+        tableName,
+        tablePath,
+        new Properties,
+        writerProperties,
+        carbonProperties
+      )
+      val streamSink = StreamingFileSink.forBulkFormat(new 
Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+      stream.addSink(streamSink)
+
+      try environment.execute
+      catch {
+        case exception: Exception =>
+          // TODO
+          throw new UnsupportedOperationException(exception)
+      }
+
+      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0)))
+    } finally {
+//      sql(s"drop table if exists $tableName").collect()
+    }
+  }
+
+  private def newWriterProperties(
+                                   dataTempPath: String,
+                                   dataPath: String,
+                                   storeLocation: String) = {
+    val properties = new Properties
+    properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
+    properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
+    properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+    properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
"1024")
+    properties
+  }
+
+  private def newCarbonProperties(storeLocation: String) = {
+    val properties = new Properties
+    properties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    properties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+    properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+    properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, 
"1024")
+    properties
+  }
+
+}
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
new file mode 100644
index 0000000..88ac173
--- /dev/null
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
@@ -0,0 +1,52 @@
+package org.apache.carbon.flink
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+abstract class TestSource(val dataCount: Int) extends SourceFunction[String] 
with CheckpointedFunction {
+  private var dataIndex = 0
+  private var dataIndexState: ListState[Integer] = _
+  private var running = false
+
+  @throws[Exception]
+  def get(index: Int): String
+
+  @throws[Exception]
+  def onFinish(): Unit = {
+    // to do nothing.
+  }
+
+  @throws[Exception]
+  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit 
= {
+    this.running = true
+    while ( {
+      this.running && this.dataIndex < this.dataCount
+    }) {
+      sourceContext.collectWithTimestamp(this.get(this.dataIndex), 
System.currentTimeMillis)
+      this.dataIndex += 1
+    }
+    this.onFinish()
+  }
+
+  override def cancel(): Unit = {
+    this.running = false
+  }
+
+  @throws[Exception]
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    this.dataIndexState.clear()
+    this.dataIndexState.add(this.dataIndex)
+  }
+
+  @throws[Exception]
+  override def initializeState(context: FunctionInitializationContext): Unit = 
{
+    this.dataIndexState = context.getOperatorStateStore.getListState(new 
ListStateDescriptor[Integer]("dataIndex", classOf[Integer]))
+    if (!context.isRestored) return
+    import scala.collection.JavaConversions._
+    for (dataIndex <- this.dataIndexState.get) {
+      this.dataIndex = dataIndex
+    }
+  }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9b5f098..e2f5067 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,8 @@
     <module>datamap/mv/core</module>
     <module>examples/spark2</module>
     <module>integration/hive</module>
+    <module>integration/flink</module>
+    <module>integration/flink-proxy</module>
     <module>integration/presto</module>
     <module>examples/flink</module>
     <module>streaming</module>
diff --git 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index ffad6b1..b1985de 100644
--- 
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ 
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -272,6 +272,18 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * To support the carbon table for sdk writer
+   *
+   * @param table carbon table
+   * @return CarbonWriterBuilder object
+   */
+  public CarbonWriterBuilder withTable(CarbonTable table) {
+    Objects.requireNonNull(table, "Table should not be null");
+    this.carbonTable = table;
+    return this;
+  }
+
+  /**
    * To support the table properties for sdk writer
    *
    * @param options key,value pair of create table properties.
@@ -646,7 +658,7 @@ public class CarbonWriterBuilder {
 
   public CarbonLoadModel buildLoadModel(Schema carbonSchema)
       throws IOException, InvalidLoadOptionException {
-    timestamp = System.nanoTime();
+    timestamp = System.currentTimeMillis();
     // validate long_string_column
     Set<String> longStringColumns = new HashSet<>();
     if (options != null && 
options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {

Reply via email to