This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch flink in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/flink by this push: new 5322767 [CARBONDATA-3557] Support write Flink streaming data to Carbon 5322767 is described below commit 5322767b70615e65d1e6c98d56582028c1f20f96 Author: liuzhi <371684...@qq.com> AuthorDate: Wed Nov 27 17:44:21 2019 +0800 [CARBONDATA-3557] Support write Flink streaming data to Carbon The write process is: 1. For every checkpoint in each Flink task, write data to local file system by StreamingFileSink and carbon SDK; 2. Copy local carbon data file to carbon data store system, such as HDFS, S3; 3. Generate and write metadata file and success file to ${tablePath}/Metadata/stage folder as a commit; This closes #3421 --- .../core/indexstore/UnsafeMemoryDMStore.java | 10 +- .../indexstore/blockletindex/BlockDataMap.java | 16 +- .../core/indexstore/row/DataMapRowImpl.java | 5 + .../carbondata/core/statusmanager/FileFormat.java | 3 + .../carbondata/core/statusmanager/StageInput.java | 58 +++++ .../apache/carbondata/core/util/CarbonUtil.java | 8 +- .../carbondata/core/util/path/CarbonTablePath.java | 6 + .../hadoop/api/CarbonTableOutputFormat.java | 8 +- integration/flink-proxy/pom.xml | 43 ++++ .../org/apache/carbon/flink/ProxyFileSystem.java | 126 ++++++++++ .../carbon/flink/ProxyFileSystemFactory.java | 43 ++++ .../org/apache/carbon/flink/ProxyFileWriter.java | 34 +++ .../carbon/flink/ProxyFileWriterFactory.java | 152 ++++++++++++ .../org/apache/carbon/flink/ProxyRecoverable.java | 53 ++++ .../carbon/flink/ProxyRecoverableOutputStream.java | 126 ++++++++++ .../carbon/flink/ProxyRecoverableSerializer.java | 174 ++++++++++++++ .../carbon/flink/ProxyRecoverableWriter.java | 80 ++++++ .../org.apache.flink.core.fs.FileSystemFactory | 1 + integration/flink/pom.xml | 267 +++++++++++++++++++++ .../apache/carbon/core/metadata/StageManager.java | 101 ++++++++ .../apache/carbon/flink/CarbonLocalProperty.java | 30 +++ .../org/apache/carbon/flink/CarbonLocalWriter.java | 233 ++++++++++++++++++ .../carbon/flink/CarbonLocalWriterFactory.java | 108 +++++++++ .../flink/CarbonLocalWriterFactoryBuilder.java | 53 ++++ .../org/apache/carbon/flink/CarbonS3Property.java | 36 +++ .../org/apache/carbon/flink/CarbonS3Writer.java | 219 +++++++++++++++++ .../apache/carbon/flink/CarbonS3WriterFactory.java | 157 ++++++++++++ .../carbon/flink/CarbonS3WriterFactoryBuilder.java | 54 +++++ .../java/org/apache/carbon/flink/CarbonWriter.java | 26 ++ .../apache/carbon/flink/CarbonWriterFactory.java | 77 ++++++ .../carbon/flink/CarbonWriterFactoryBuilder.java | 74 ++++++ ....apache.carbon.flink.CarbonWriterFactoryBuilder | 2 + .../org/apache/carbon/flink/TestCarbonWriter.scala | 124 ++++++++++ .../scala/org/apache/carbon/flink/TestSource.scala | 52 ++++ pom.xml | 2 + .../carbondata/sdk/file/CarbonWriterBuilder.java | 14 +- 36 files changed, 2563 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 5ae72fc..8d9cb57 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -231,10 +231,12 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore { getUnsafe().putInt(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition); runningLength += 4; - getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), - memoryBlock.getBaseOffset() + startOffset + varPosition, data.length); - runningLength += data.length; - varPosition += data.length; + if (data != null) { + getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(), + memoryBlock.getBaseOffset() + startOffset + varPosition, data.length); + runningLength += data.length; + varPosition += data.length; + } return varPosition; default: throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 52cd8cc..e83985f 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -130,7 +130,7 @@ public class BlockDataMap extends CoarseGrainDataMap Path path = new Path(blockletDataMapInfo.getFilePath()); // store file path only in case of partition table, non transactional table and flat folder // structure - byte[] filePath = null; + byte[] filePath; boolean isPartitionTable = blockletDataMapInfo.getCarbonTable().isHivePartitionTable(); if (isPartitionTable || !blockletDataMapInfo.getCarbonTable().isTransactionalTable() || blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() || @@ -139,6 +139,8 @@ public class BlockDataMap extends CoarseGrainDataMap blockletDataMapInfo.getCarbonTable().getTablePath())) { filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); isFilePathStored = true; + } else { + filePath = new byte[0]; } byte[] fileName = path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET); byte[] segmentId = @@ -506,7 +508,7 @@ public class BlockDataMap extends CoarseGrainDataMap if (null != summaryRow) { summaryRow.setByteArray(fileName, SUMMARY_INDEX_FILE_NAME); summaryRow.setByteArray(segmentId, SUMMARY_SEGMENTID); - if (null != filePath) { + if (filePath.length > 0) { summaryRow.setByteArray(filePath, SUMMARY_INDEX_PATH); } try { @@ -635,8 +637,14 @@ public class BlockDataMap extends CoarseGrainDataMap // dummy value return 0; } else { - return ByteBuffer.wrap(getBlockletRowCountForEachBlock()).getShort( - index * CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + final byte[] bytes = getBlockletRowCountForEachBlock(); + // if the segment data is written in tablepath + // then the reuslt of getBlockletRowCountForEachBlock will be empty. + if (bytes.length == 0) { + return 0; + } else { + return ByteBuffer.wrap(bytes).getShort(index * CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + } } } diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java index 683d2ef..22eca8a 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java @@ -41,6 +41,11 @@ public class DataMapRowImpl extends DataMapRow { @Override public int getLengthInBytes(int ordinal) { + // if the segment data is written in tablepath + // then the data[BlockletDataMapRowIndexes.SUMMARY_INDEX_PATH] will be null. + if (data[ordinal] == null) { + return 0; + } return ((byte[]) data[ordinal]).length; } diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java index d68429b..b5916fe 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java @@ -46,6 +46,9 @@ public class FileFormat implements Serializable { } public static FileFormat getByOrdinal(int ordinal) { + if (ordinal < 0) { + throw new IllegalArgumentException("Argument [ordinal] is less than 0."); + } switch (ordinal) { case 0: return COLUMNAR_V3; diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java new file mode 100644 index 0000000..60f8b32 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.statusmanager; + +import java.util.Map; + +public class StageInput { + + /** + * the base path of files + */ + private String base; + + /** + * the list of (file, length) in this StageInput + */ + private Map<String, Long> files; + + public StageInput() { + + } + + public StageInput(String base, Map<String, Long> files) { + this.base = base; + this.files = files; + } + + public String getBase() { + return base; + } + + public void setBase(String base) { + this.base = base; + } + + public Map<String, Long> getFiles() { + return files; + } + + public void setFiles(Map<String, Long> files) { + this.files = files; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 4d5de6b..2ffc3f7 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2833,8 +2833,12 @@ public final class CarbonUtil { .getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize); IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize); } finally { - CarbonUtil.closeStream(dataInputStream); - CarbonUtil.closeStream(dataOutputStream); + try { + CarbonUtil.closeStream(dataInputStream); + CarbonUtil.closeStream(dataOutputStream); + } catch (IOException exception) { + LOGGER.error(exception.getMessage(), exception); + } } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 6d1a4ff..e0c9d98 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -55,6 +55,8 @@ public class CarbonTablePath { private static final String STREAMING_DIR = ".streaming"; private static final String STREAMING_LOG_DIR = "log"; private static final String STREAMING_CHECKPOINT_DIR = "checkpoint"; + private static final String STAGE_DIR = "stage"; + public static final String SUCCESS_FILE_SUBFIX = ".success"; /** * This class provides static utility only. @@ -62,6 +64,10 @@ public class CarbonTablePath { private CarbonTablePath() { } + public static String getStageDir(String tablePath) { + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + STAGE_DIR; + } + /** * The method returns the folder path containing the carbon file. * diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index fee2e4a..7700c8e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -122,6 +122,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje private CarbonOutputCommitter committer; + /** + * Output format task id generator. It should generate a unique id for every task. + * It's may conflict when use System.nonaTime() as task id. + */ + private static final AtomicLong DEFAULT_TASK_NO = new AtomicLong(0); + public static void setDatabaseName(Configuration configuration, String databaseName) { if (null != databaseName) { configuration.set(DATABASE_NAME, databaseName); @@ -255,7 +261,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje } if (null == loadModel.getTaskNo() || loadModel.getTaskNo().isEmpty()) { loadModel.setTaskNo(taskAttemptContext.getConfiguration() - .get("carbon.outputformat.taskno", String.valueOf(System.nanoTime()))); + .get("carbon.outputformat.taskno", String.valueOf(DEFAULT_TASK_NO.getAndIncrement()))); } loadModel.setDataWritePath( taskAttemptContext.getConfiguration().get("carbon.outputformat.writepath")); diff --git a/integration/flink-proxy/pom.xml b/integration/flink-proxy/pom.xml new file mode 100644 index 0000000..27af507 --- /dev/null +++ b/integration/flink-proxy/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>carbondata-flink-proxy</artifactId> + <name>Apache CarbonData :: Flink Proxy</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.11</artifactId> + <version>1.8.0</version> + </dependency> + </dependencies> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java new file mode 100644 index 0000000..8c557a9 --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.net.URI; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; + +/** + * Flink need user provides file output stream when use StreamingFileSink, + * but CarbonWriter encapsulate the file output stream inside the SDK, + * so, we need a proxy file output stream to connect StreamingFileSink and CarbonWriter. + */ +public final class ProxyFileSystem extends FileSystem { + + public static final String SCHEMA = "proxy"; + + public static final URI DEFAULT_URI = URI.create(SCHEMA + ":/"); + + public static final ProxyFileSystem INSTANCE = new ProxyFileSystem(); + + private ProxyFileSystem() { + // private constructor. + } + + @Override + public Path getWorkingDirectory() { + throw new UnsupportedOperationException(); + } + + @Override + public Path getHomeDirectory() { + throw new UnsupportedOperationException(); + } + + @Override + public URI getUri() { + return DEFAULT_URI; + } + + @Override + public FileStatus getFileStatus(final Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public BlockLocation[] getFileBlockLocations( + final FileStatus fileStatus, + final long offset, + final long length + ) { + throw new UnsupportedOperationException(); + } + + @Override + public FSDataInputStream open(final Path path, final int i) { + throw new UnsupportedOperationException(); + } + + @Override + public FSDataInputStream open(final Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public FileStatus[] listStatus(final Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean delete(final Path path, final boolean b) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean mkdirs(final Path path) { + throw new UnsupportedOperationException(); + } + + @Override + public FSDataOutputStream create(final Path path, final WriteMode writeMode) { + throw new UnsupportedOperationException(); + } + + @Override + public ProxyRecoverableWriter createRecoverableWriter() { + return new ProxyRecoverableWriter(); + } + + @Override + public boolean rename(final Path source, final Path target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDistributedFS() { + return false; + } + + @Override + public FileSystemKind getKind() { + return FileSystemKind.FILE_SYSTEM; + } + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java new file mode 100644 index 0000000..1adb44c --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; +import java.net.URI; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystemFactory; + +public final class ProxyFileSystemFactory implements FileSystemFactory { + + @Override + public String getScheme() { + return ProxyFileSystem.SCHEMA; + } + + @Override + public void configure(final Configuration configuration) { + // to do nothing. + } + + @Override + public ProxyFileSystem create(final URI uri) throws IOException { + return ProxyFileSystem.INSTANCE; + } + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java new file mode 100644 index 0000000..f8ef039 --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; + +import org.apache.flink.api.common.serialization.BulkWriter; + +public abstract class ProxyFileWriter<OUT> implements BulkWriter<OUT> { + + public abstract ProxyFileWriterFactory getFactory(); + + public abstract String getPartition(); + + public abstract void commit() throws IOException; + + public abstract void close() throws IOException; + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java new file mode 100644 index 0000000..bb683a3 --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.flink.api.common.serialization.BulkWriter; + +public abstract class ProxyFileWriterFactory<OUT> implements BulkWriter.Factory<OUT> { + + private static final long serialVersionUID = -1449889091046572219L; + + private static final Map<String, Class<? extends ProxyFileWriterFactory>> FACTORY_MAP = + new ConcurrentHashMap<>(); + + public static ProxyFileWriterFactory newInstance(final String factoryType) { + if (factoryType == null) { + throw new IllegalArgumentException("Argument [factoryType] is null."); + } + final Class<? extends ProxyFileWriterFactory> factoryClass = FACTORY_MAP.get(factoryType); + if (factoryClass == null) { + return null; + } + try { + return factoryClass.newInstance(); + } catch (InstantiationException | IllegalAccessException exception) { + throw new RuntimeException(exception); + } + } + + public static void register( + final String factoryType, + final Class<? extends ProxyFileWriterFactory> factoryClass + ) { + if (factoryType == null) { + throw new IllegalArgumentException("Argument [factoryType] is null."); + } + if (factoryClass == null) { + throw new IllegalArgumentException("Argument [factoryClass] is null."); + } + // TODO 检查参数 + // TODO 检查是否已被注册,重复注册,直接忽略 + FACTORY_MAP.put(factoryType, factoryClass); + } + + private Configuration configuration; + + public abstract String getType(); + + public Configuration getConfiguration() { + return this.configuration; + } + + public void setConfiguration(final Configuration configuration) { + this.configuration = configuration; + } + + public abstract ProxyFileWriter<OUT> create(String partition) throws IOException; + + public static class Configuration implements Serializable { + + private static final long serialVersionUID = 8615149992583690295L; + + public Configuration( + final String databaseName, + final String tableName, + final String tablePath, + final Properties tableProperties, + final Properties writerProperties, + final Properties carbonProperties + ) { + if (tableName == null) { + throw new IllegalArgumentException("Argument [tableName] is null."); + } + if (tablePath == null) { + throw new IllegalArgumentException("Argument [tablePath] is null."); + } + if (tableProperties == null) { + throw new IllegalArgumentException("Argument [tableProperties] is null."); + } + if (writerProperties == null) { + throw new IllegalArgumentException("Argument [writerProperties] is null."); + } + if (carbonProperties == null) { + throw new IllegalArgumentException("Argument [carbonProperties] is null."); + } + this.databaseName = databaseName; + this.tableName = tableName; + this.tablePath = tablePath; + this.tableProperties = tableProperties; + this.writerProperties = writerProperties; + this.carbonProperties = carbonProperties; + } + + private final String databaseName; + + private final String tableName; + + private final String tablePath; + + private final Properties tableProperties; + + private final Properties writerProperties; + + private final Properties carbonProperties; + + public String getDatabaseName() { + return this.databaseName; + } + + public String getTableName() { + return this.tableName; + } + + public String getTablePath() { + return this.tablePath; + } + + public Properties getTableProperties() { + return this.tableProperties; + } + + public Properties getWriterProperties() { + return this.writerProperties; + } + + public Properties getCarbonProperties() { + return this.carbonProperties; + } + + } + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java new file mode 100644 index 0000000..11e5d5e --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import org.apache.flink.core.fs.RecoverableWriter; + +public final class ProxyRecoverable + implements RecoverableWriter.CommitRecoverable, RecoverableWriter.ResumeRecoverable { + + public ProxyRecoverable( + final String writerType, + final ProxyFileWriterFactory.Configuration writerConfiguration, + final String partition + ) { + this.writerType = writerType; + this.writerConfiguration = writerConfiguration; + this.partition = partition; + } + + private final String writerType; + + private final ProxyFileWriterFactory.Configuration writerConfiguration; + + private final String partition; + + public String getWriterType() { + return this.writerType; + } + + public ProxyFileWriterFactory.Configuration getWriterConfiguration() { + return this.writerConfiguration; + } + + public String getPartition() { + return this.partition; + } + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java new file mode 100644 index 0000000..1e59209 --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; + +public final class ProxyRecoverableOutputStream extends RecoverableFsDataOutputStream { + + ProxyRecoverableOutputStream() { + // protected constructor. + } + + private ProxyFileWriter<?> writer; + + public void bind(final ProxyFileWriter<?> writer) { + if (writer == null) { + throw new IllegalArgumentException("Argument [writer] is null."); + } + if (this.writer != null) { + throw new IllegalStateException("The writer was bound."); + } + this.writer = writer; + } + + @Override + public long getPos() { + throw new UnsupportedOperationException(); + } + + @Override + public RecoverableWriter.ResumeRecoverable persist() { + throw new UnsupportedOperationException(); + } + + @Override + public void write(final int aByte) { + throw new UnsupportedOperationException(); + } + + @Override + public void flush() { + throw new UnsupportedOperationException(); + } + + @Override + public void sync() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + // TODO streaming结束的时候和出异常的时候都会调用该方法 + if (this.writer != null) { + this.writer.close(); + } + } + + @Override + public Committer closeForCommit() { + if (this.writer == null) { + throw new IllegalStateException("The writer was not bound."); + } + return new Committer( + new ProxyRecoverable( + this.writer.getFactory().getType(), + this.writer.getFactory().getConfiguration(), + this.writer.getPartition() + ) + ); + } + + static final class Committer implements RecoverableFsDataOutputStream.Committer { + + Committer(final ProxyRecoverable recoverable) { + this.recoverable = recoverable; + } + + private final ProxyRecoverable recoverable; + + @Override + public void commit() throws IOException { + this.newWriter().commit(); + } + + @Override + public void commitAfterRecovery() throws IOException { + // to do nothing. + } + + @Override + public RecoverableWriter.CommitRecoverable getRecoverable() { + return this.recoverable; + } + + private ProxyFileWriter<?> newWriter() throws IOException { + final ProxyFileWriterFactory writerFactory = + ProxyFileWriterFactory.newInstance(this.recoverable.getWriterType()); + if (writerFactory == null) { + // TODO + throw new UnsupportedOperationException(); + } + writerFactory.setConfiguration(this.recoverable.getWriterConfiguration()); + return writerFactory.create(this.recoverable.getPartition()); + } + + } + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java new file mode 100644 index 0000000..8bf80bd --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Properties; + +import org.apache.flink.core.io.SimpleVersionedSerializer; + +public final class ProxyRecoverableSerializer + implements SimpleVersionedSerializer<ProxyRecoverable> { + + public static final ProxyRecoverableSerializer INSTANCE = new ProxyRecoverableSerializer(); + + public static final int VERSION = 1; + + private static final byte TRUE = 0; + + private static final byte FALSE = 1; + + private static final String CHARSET = "UTF-8"; + + // TODO: make it configurable + private static final int BUFFER_SIZE = 10240; + + private ProxyRecoverableSerializer() { + // private constructor. + } + + @Override + public int getVersion() { + return VERSION; + } + + @Override + public byte[] serialize(final ProxyRecoverable proxyRecoverable) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE); + serializeString(byteBuffer, proxyRecoverable.getWriterType()); + serializeConfiguration(byteBuffer, proxyRecoverable.getWriterConfiguration()); + serializeString(byteBuffer, proxyRecoverable.getPartition()); + final byte[] bytes = new byte[byteBuffer.position()]; + byteBuffer.position(0); + byteBuffer.get(bytes); + return bytes; + } + + private static void serializeConfiguration( + final ByteBuffer byteBuffer, + final ProxyFileWriterFactory.Configuration configuration + ) { + serializeString(byteBuffer, configuration.getDatabaseName()); + serializeString(byteBuffer, configuration.getTableName()); + serializeString(byteBuffer, configuration.getTablePath()); + serializeProperties(byteBuffer, configuration.getTableProperties()); + serializeProperties(byteBuffer, configuration.getWriterProperties()); + serializeProperties(byteBuffer, configuration.getCarbonProperties()); + } + + private static void serializeString(final ByteBuffer byteBuffer, final String string) { + if (string == null) { + byteBuffer.put(TRUE); + } else { + byteBuffer.put(FALSE); + final byte[] stringBytes; + try { + stringBytes = string.getBytes(CHARSET); + } catch (UnsupportedEncodingException exception) { + throw new RuntimeException(exception); + } + byteBuffer.putInt(stringBytes.length); + byteBuffer.put(stringBytes); + } + } + + private static void serializeProperties( + final ByteBuffer byteBuffer, + final Properties properties + ) { + if (properties == null) { + byteBuffer.put(TRUE); + } else { + byteBuffer.put(FALSE); + byteBuffer.putInt(properties.size()); + for (String propertyName : properties.stringPropertyNames()) { + serializeString(byteBuffer, propertyName); + serializeString(byteBuffer, properties.getProperty(propertyName)); + } + } + } + + @Override + public ProxyRecoverable deserialize(final int version, final byte[] bytes) { + if (version != VERSION) { + throw new UnsupportedOperationException("Unsupported version: " + version + "."); + } + final ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + final String writerType = deserializeString(byteBuffer); + final ProxyFileWriterFactory.Configuration writerConfiguration = + deserializeConfiguration(byteBuffer); + final String partition = deserializeString(byteBuffer); + return new ProxyRecoverable(writerType, writerConfiguration, partition); + } + + private static ProxyFileWriterFactory.Configuration deserializeConfiguration( + final ByteBuffer byteBuffer + ) { + final String databaseName = deserializeString(byteBuffer); + final String tableName = deserializeString(byteBuffer); + final String tablePath = deserializeString(byteBuffer); + final Properties tableProperties = deserializeProperties(byteBuffer); + final Properties writerProperties = deserializeProperties(byteBuffer); + final Properties carbonProperties = deserializeProperties(byteBuffer); + return new ProxyFileWriterFactory.Configuration( + databaseName, + tableName, + tablePath, + tableProperties, + writerProperties, + carbonProperties + ); + } + + private static String deserializeString(final ByteBuffer byteBuffer) { + switch (byteBuffer.get()) { + case TRUE: + return null; + case FALSE: + final int stringByteLength = byteBuffer.getInt(); + final byte[] stringBytes = new byte[stringByteLength]; + byteBuffer.get(stringBytes); + try { + return new String(stringBytes, CHARSET); + } catch (UnsupportedEncodingException exception) { + throw new RuntimeException(exception); + } + default: + throw new RuntimeException(); + } + } + + @SuppressWarnings("ConstantConditions") + private static Properties deserializeProperties(final ByteBuffer byteBuffer) { + switch (byteBuffer.get()) { + case TRUE: + return null; + case FALSE: + final int propertyCount = byteBuffer.getInt(); + final Properties properties = new Properties(); + for (int index = 0; index < propertyCount; index++) { + properties.put(deserializeString(byteBuffer), deserializeString(byteBuffer)); + } + return properties; + default: + throw new RuntimeException(); + } + } + +} diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java new file mode 100644 index 0000000..3713ac2 --- /dev/null +++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +public final class ProxyRecoverableWriter implements RecoverableWriter { + + @SuppressWarnings("unchecked") + @Override + public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() { + final SimpleVersionedSerializer<? extends CommitRecoverable> serializer = + ProxyRecoverableSerializer.INSTANCE; + return (SimpleVersionedSerializer<CommitRecoverable>) serializer; + } + + @SuppressWarnings("unchecked") + @Override + public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() { + final SimpleVersionedSerializer<? extends ResumeRecoverable> serializer = + ProxyRecoverableSerializer.INSTANCE; + return (SimpleVersionedSerializer<ResumeRecoverable>) serializer; + } + + @Override + public ProxyRecoverableOutputStream open(final Path path) { + return new ProxyRecoverableOutputStream(); + } + + @Override + public RecoverableFsDataOutputStream recover(final ResumeRecoverable resumeRecoverable) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean requiresCleanupOfRecoverableState() { + return false; + } + + @Override + public boolean cleanupRecoverableState(final ResumeRecoverable resumeRecoverable) { + throw new UnsupportedOperationException(); + } + + @Override + public ProxyRecoverableOutputStream.Committer recoverForCommit( + final CommitRecoverable commitRecoverable + ) { + if (!(commitRecoverable instanceof ProxyRecoverable)) { + throw new IllegalArgumentException( + "ProxyFileSystem can not recover recoverable for other file system. " + commitRecoverable + ); + } + return new ProxyRecoverableOutputStream.Committer((ProxyRecoverable) commitRecoverable); + } + + @Override + public boolean supportsResume() { + return false; + } + +} diff --git a/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory new file mode 100644 index 0000000..f8a0ed8 --- /dev/null +++ b/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -0,0 +1 @@ +org.apache.carbon.flink.ProxyFileSystemFactory \ No newline at end of file diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml new file mode 100644 index 0000000..80746c0 --- /dev/null +++ b/integration/flink/pom.xml @@ -0,0 +1,267 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <modelVersion>4.0.0</modelVersion> + + <artifactId>carbondata-flink</artifactId> + <name>Apache CarbonData :: Flink</name> + <packaging>jar</packaging> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + <hadoop.version>2.7.5</hadoop.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-flink-proxy</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-format</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-processing</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.huaweicloud</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-hadoop</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.huaweicloud</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-store-sdk</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-streaming</artifactId> + </exclusion> + <exclusion> + <groupId>org.huaweicloud</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>2.7.5</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.11</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>2.7.5</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.17.Final</version> + <scope>test</scope> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>spark-2.2</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + <profile> + <id>spark-2.3</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <goals> + <goal>compile</goal> + </goals> + <phase>process-resources</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java new file mode 100644 index 0000000..ad38260 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.core.metadata; + +import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import com.google.gson.Gson; +import org.apache.hadoop.io.IOUtils; +import org.apache.log4j.Logger; + +public final class StageManager { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(StageManager.class.getName()); + + public static void writeStageInput(final String stageInputPath, final StageInput stageInput) + throws IOException { + AtomicFileOperations fileWrite = + AtomicFileOperationFactory.getAtomicFileOperations(stageInputPath); + BufferedWriter writer = null; + DataOutputStream dataOutputStream = null; + try { + dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE); + writer = new BufferedWriter(new OutputStreamWriter(dataOutputStream, StandardCharsets.UTF_8)); + String metadataInstance = new Gson().toJson(stageInput); + writer.write(metadataInstance); + } catch (IOException e) { + LOGGER.error("Error message: " + e.getLocalizedMessage()); + fileWrite.setFailed(); + throw e; + } finally { + if (null != writer) { + writer.flush(); + } + CarbonUtil.closeStreams(writer); + fileWrite.close(); + } + + try { + writeSuccessFile(stageInputPath + CarbonTablePath.SUCCESS_FILE_SUBFIX); + } catch (Throwable exception) { + try { + CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(stageInputPath)); + } catch (Throwable e) { + LOGGER.error("Fail to delete stage input meta data [" + stageInputPath + "].", exception); + } + throw exception; + } + } + + private static void writeSuccessFile(final String successFilePath) throws IOException { + final DataOutputStream segmentStatusSuccessOutputStream = + FileFactory.getDataOutputStream( + successFilePath, FileFactory.getFileType(successFilePath), + CarbonCommonConstants.BYTEBUFFER_SIZE, 1024); + try { + IOUtils.copyBytes( + new ByteArrayInputStream(new byte[0]), + segmentStatusSuccessOutputStream, + CarbonCommonConstants.BYTEBUFFER_SIZE); + segmentStatusSuccessOutputStream.flush(); + } finally { + try { + CarbonUtil.closeStream(segmentStatusSuccessOutputStream); + } catch (IOException exception) { + LOGGER.error(exception.getMessage(), exception); + } + } + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java new file mode 100644 index 0000000..3383e8c --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +public final class CarbonLocalProperty { + + public static final String DATA_TEMP_PATH = "carbon.writer.local.data.temp.path"; + + public static final String DATA_PATH = "carbon.writer.local.data.path"; + + private CarbonLocalProperty() { + // private constructor. + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java new file mode 100644 index 0000000..dcfe8b3 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.carbon.core.metadata.StageManager; +import org.apache.commons.io.FileUtils; +import org.apache.log4j.Logger; + +final class CarbonLocalWriter extends CarbonWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonLocalWriter.class.getName()); + + CarbonLocalWriter( + final CarbonLocalWriterFactory factory, + final CarbonTable table, + final org.apache.carbondata.sdk.file.CarbonWriter writer, + final String writePath, + final String writePartition + ) { + ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); + } + this.factory = factory; + this.table = table; + this.writer = writer; + this.writePath = writePath; + this.writePartition = writePartition; + this.flushed = true; + } + + private final CarbonLocalWriterFactory factory; + + private final CarbonTable table; + + private final org.apache.carbondata.sdk.file.CarbonWriter writer; + + private final String writePath; + + private final String writePartition; + + private volatile boolean flushed; + + @Override + public CarbonLocalWriterFactory getFactory() { + return this.factory; + } + + @Override + public String getPartition() { + return this.writePartition; + } + + @Override + public void addElement(final String element) throws IOException { + this.writer.write(element); + this.flushed = false; + } + + @Override + public void flush() throws IOException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Flush writer. " + this.toString()); + } + synchronized (this) { + if (!this.flushed) { + this.writer.close(); + this.flushed = true; + } + } + } + + @Override + public void finish() throws IOException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Finish writer. " + this.toString()); + } + if (!this.flushed) { + this.flush(); + } + } + + @Override + public void commit() throws IOException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Commit write. " + this.toString()); + } + try { + final Properties writerProperties = this.factory.getConfiguration().getWriterProperties(); + String dataPath = writerProperties.getProperty(CarbonLocalProperty.DATA_PATH); + if (dataPath == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonLocalProperty.DATA_PATH + "] is not set." + ); + } + dataPath = dataPath + this.table.getDatabaseName() + "/" + + this.table.getTableName() + "/" + this.writePartition + "/"; + Map<String, Long> fileList = + this.uploadSegmentDataFiles(this.writePath + "Fact/Part0/Segment_null/", dataPath); + try { + String stageDir = CarbonTablePath.getStageDir( + table.getAbsoluteTableIdentifier().getTablePath()); + tryCreateLocalDirectory(new File(stageDir)); + String stageInputPath = stageDir + "/" + this.writePartition; + StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, fileList)); + } catch (Throwable exception) { + this.deleteSegmentDataFilesQuietly(dataPath); + throw exception; + } + } finally { + try { + FileUtils.deleteDirectory(new File(this.writePath)); + } catch (IOException exception) { + LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception); + } + } + } + + @Override + public void close() { + if (this.writer == null) { + return; + } + try { + synchronized (this) { + if (!this.flushed) { + this.writer.close(); + this.flushed = true; + } + } + } catch (Throwable exception) { + LOGGER.error("Fail to close carbon writer.", exception); + } finally { + try { + FileUtils.deleteDirectory(new File(this.writePath)); + } catch (IOException exception) { + LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception); + } + } + } + + private Map<String, Long> uploadSegmentDataFiles(final String localPath, final String remotePath) + throws IOException { + final File[] files = new File(localPath).listFiles(); + if (files == null) { + return new HashMap<>(0); + } + Map<String, Long> fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { + fileNameMapLength.put(file.getName(), file.length()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); + } + try { + final File remoteFile = new File(remotePath + file.getName()); + if (!remoteFile.exists()) { + tryCreateLocalFile(remoteFile); + } + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); + } catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); + } + } + return fileNameMapLength; + } + + private void deleteSegmentDataFilesQuietly(final String segmentDataPath) { + try { + CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(segmentDataPath)); + } catch (Throwable exception) { + LOGGER.error("Fail to delete segment data path [" + segmentDataPath + "].", exception); + } + } + + private static void tryCreateLocalFile(final File file) throws IOException { + if (file.exists()) { + return; + } + if (file.getParentFile() != null) { + tryCreateLocalDirectory(file.getParentFile()); + } + if (!file.createNewFile()) { + throw new IOException("File [" + file.getCanonicalPath() + "] is exist."); + } + } + + private static void tryCreateLocalDirectory(final File file) throws IOException { + if (file.exists()) { + return; + } + if (file.getParentFile() != null) { + tryCreateLocalDirectory(file.getParentFile()); + } + if (!file.mkdir() && LOGGER.isDebugEnabled()) { + LOGGER.debug("Directory [" + file.getCanonicalPath() + "] is exist."); + } + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java new file mode 100644 index 0000000..4c24a8b --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.sdk.file.CarbonWriter; +import org.apache.carbondata.sdk.file.Schema; + +public final class CarbonLocalWriterFactory extends CarbonWriterFactory { + + private static final long serialVersionUID = 2822670807460968078L; + + @Override + public String getType() { + return CarbonLocalWriterFactoryBuilder.TYPE; + } + + @Override + protected CarbonLocalWriter create0() throws IOException { + final Properties writerProperties = this.getConfiguration().getWriterProperties(); + final String writeTempPath = writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH); + if (writeTempPath == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is not set." + ); + } + final String writePartition = UUID.randomUUID().toString().replace("-", ""); + final String writePath = writeTempPath + "_" + writePartition + "/"; + final CarbonTable table = this.getTable(); + final CarbonTable clonedTable = + CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize())); + clonedTable.getTableInfo().setTablePath(writePath); + final org.apache.carbondata.sdk.file.CarbonWriter writer; + try { + writer = CarbonWriter.builder() + .outputPath("") + .writtenBy("flink") + .withTable(clonedTable) + .withTableProperties(this.getTableProperties()) + .withJsonInput(this.getTableSchema(clonedTable)) + .build(); + } catch (InvalidLoadOptionException exception) { + // TODO + throw new UnsupportedOperationException(exception); + } + return new CarbonLocalWriter(this, table, writer, writePath, writePartition); + } + + @Override + protected CarbonLocalWriter create0(final String partition) throws IOException { + final Properties writerProperties = this.getConfiguration().getWriterProperties(); + final String writeTempPath = writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH); + if (writeTempPath == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is not set." + ); + } + final String writePath = writeTempPath + "_" + partition + "/"; + final CarbonTable table = this.getTable(); + return new CarbonLocalWriter(this, table, null, writePath, partition); + } + + private Schema getTableSchema(final CarbonTable table) { + final List<CarbonColumn> columnList = table.getCreateOrderColumn(); + final List<ColumnSchema> columnSchemaList = new ArrayList<>(columnList.size()); + for (CarbonColumn column : columnList) { + columnSchemaList.add(column.getColumnSchema()); + } + return new Schema(columnSchemaList); + } + + private Map<String, String> getTableProperties() { + final Properties tableProperties = this.getConfiguration().getTableProperties(); + final Map<String, String> tablePropertyMap = new HashMap<>(tableProperties.size()); + for (String propertyName : tableProperties.stringPropertyNames()) { + tablePropertyMap.put(propertyName, tableProperties.getProperty(propertyName)); + } + return tablePropertyMap; + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java new file mode 100644 index 0000000..74d1e41 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.util.Properties; + +public final class CarbonLocalWriterFactoryBuilder extends CarbonWriterFactoryBuilder { + + public static final String TYPE = "Local"; + + @Override + public String getType() { + return TYPE; + } + + @Override + public CarbonLocalWriterFactory build( + final String databaseName, + final String tableName, + final String tablePath, + final Properties tableProperties, + final Properties writerProperties, + final Properties carbonProperties + ) { + final CarbonLocalWriterFactory factory = new CarbonLocalWriterFactory(); + factory.setConfiguration( + new ProxyFileWriterFactory.Configuration( + databaseName, + tableName, + tablePath, + tableProperties, + writerProperties, + carbonProperties + ) + ); + return factory; + } +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java new file mode 100644 index 0000000..6d9d94b --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +final class CarbonS3Property { + + static final String ACCESS_KEY = "carbon.writer.s3.access.key"; + + static final String SECRET_KEY = "carbon.writer.s3.secret.key"; + + static final String ENDPOINT = "carbon.writer.s3.endpoint"; + + static final String DATA_TEMP_PATH = "carbon.writer.s3.data.temp.path"; + + static final String DATA_PATH = "carbon.writer.s3.data.path"; + + private CarbonS3Property() { + // private constructor. + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java new file mode 100644 index 0000000..e5aeca4 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.statusmanager.StageInput; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.core.util.path.CarbonTablePath; + +import org.apache.carbon.core.metadata.StageManager; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + +final class CarbonS3Writer extends CarbonWriter { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonS3Writer.class.getName()); + + CarbonS3Writer( + final CarbonS3WriterFactory factory, + final CarbonTable table, + final org.apache.carbondata.sdk.file.CarbonWriter writer, + final String writePath, + final String writePartition, + final Configuration configuration + ) { + ProxyFileWriterFactory.register(factory.getType(), factory.getClass()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Open writer. " + this.toString()); + } + this.factory = factory; + this.table = table; + this.writer = writer; + this.writePath = writePath; + this.writePartition = writePartition; + this.configuration = configuration; + this.flushed = true; + } + + private final CarbonS3WriterFactory factory; + + private final CarbonTable table; + + private final org.apache.carbondata.sdk.file.CarbonWriter writer; + + private final String writePath; + + private final String writePartition; + + private final Configuration configuration; + + private volatile boolean flushed; + + @Override + public CarbonS3WriterFactory getFactory() { + return this.factory; + } + + @Override + public String getPartition() { + return this.writePartition; + } + + @Override + public void addElement(final String element) throws IOException { + this.writer.write(element); + this.flushed = false; + } + + @Override + public void flush() throws IOException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Flush writer. " + this.toString()); + } + synchronized (this) { + if (!this.flushed) { + this.writer.close(); + this.flushed = true; + } + } + } + + @Override + public void finish() throws IOException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Finish writer. " + this.toString()); + } + if (!this.flushed) { + this.flush(); + } + } + + @Override + public void commit() throws IOException { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Commit write. " + this.toString()); + } + ThreadLocalSessionInfo.setConfigurationToCurrentThread(this.configuration); + ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo() + .getNonSerializableExtraInfo().put("carbonConf", this.configuration); + try { + final Properties writerProperties = this.factory.getConfiguration().getWriterProperties(); + String dataPath = writerProperties.getProperty(CarbonS3Property.DATA_PATH); + if (dataPath == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.DATA_PATH + "] is not set." + ); + } + if (!dataPath.startsWith(CarbonCommonConstants.S3A_PREFIX)) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.DATA_PATH + "] is not a s3a path." + ); + } + dataPath = dataPath + this.table.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + + this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR + + this.writePartition + CarbonCommonConstants.FILE_SEPARATOR; + Map<String, Long> fileList = + this.uploadSegmentDataFiles(this.writePath + "Fact/Part0/Segment_null/", dataPath); + try { + String stageInputPath = CarbonTablePath.getStageDir( + table.getAbsoluteTableIdentifier().getTablePath()) + + CarbonCommonConstants.FILE_SEPARATOR + this.writePartition; + StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, fileList)); + } catch (Throwable exception) { + this.deleteSegmentDataFilesQuietly(dataPath); + throw exception; + } + } finally { + try { + FileUtils.deleteDirectory(new File(this.writePath)); + } catch (IOException exception) { + LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception); + } + } + } + + @Override + public void close() { + if (this.writer == null) { + return; + } + try { + synchronized (this) { + if (!this.flushed) { + this.writer.close(); + this.flushed = true; + } + } + } catch (Throwable exception) { + LOGGER.error("Fail to close carbon writer.", exception); + } finally { + try { + FileUtils.deleteDirectory(new File(this.writePath)); + } catch (IOException exception) { + LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception); + } + } + } + + private Map<String, Long> uploadSegmentDataFiles( + final String localPath, final String remotePath) { + final File[] files = new File(localPath).listFiles(); + if (files == null) { + return new HashMap<>(0); + } + Map<String, Long> fileNameMapLength = new HashMap<>(files.length); + for (File file : files) { + fileNameMapLength.put(file.getName(), file.length()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start."); + } + try { + CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024); + } catch (CarbonDataWriterException exception) { + LOGGER.error(exception.getMessage(), exception); + throw exception; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end."); + } + } + return fileNameMapLength; + } + + private void deleteSegmentDataFilesQuietly(final String segmentDataPath) { + try { + CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(segmentDataPath)); + } catch (Throwable exception) { + LOGGER.error("Fail to delete segment data path [" + segmentDataPath + "].", exception); + } + } +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java new file mode 100644 index 0000000..f1ab483 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import org.apache.carbondata.sdk.file.CarbonWriter; +import org.apache.carbondata.sdk.file.Schema; + +public final class CarbonS3WriterFactory extends CarbonWriterFactory { + + private static final long serialVersionUID = 2302824357711095245L; + + @Override + public String getType() { + return CarbonS3WriterFactoryBuilder.TYPE; + } + + @Override + protected CarbonS3Writer create0() throws IOException { + final Properties writerProperties = this.getConfiguration().getWriterProperties(); + final String writeTempPath = writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH); + if (writeTempPath == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is not set." + ); + } + final String writePartition = UUID.randomUUID().toString().replace("-", ""); + final String writePath = writeTempPath + "_" + writePartition + "/"; + final CarbonTable table = this.getTable(); + final CarbonTable clonedTable = + CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize())); + clonedTable.getTableInfo().setTablePath(writePath); + final org.apache.hadoop.conf.Configuration configuration = this.getS3Configuration(); + final CarbonWriter writer; + try { + writer = CarbonWriter.builder() + .outputPath("") + .writtenBy("flink") + .withTable(clonedTable) + .withTableProperties(this.getTableProperties()) + .withJsonInput(this.getTableSchema(clonedTable)) + .withHadoopConf(configuration) + .build(); + } catch (InvalidLoadOptionException exception) { + // TODO + throw new UnsupportedOperationException(exception); + } + return new CarbonS3Writer(this, table, writer, writePath, writePartition, configuration); + } + + @Override + protected CarbonS3Writer create0(final String partition) throws IOException { + final Properties writerProperties = this.getConfiguration().getWriterProperties(); + final String writeTempPath = writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH); + if (writeTempPath == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is not set." + ); + } + final String writePath = writeTempPath + "_" + partition + "/"; + final CarbonTable table = this.getTable(); + final org.apache.hadoop.conf.Configuration configuration = this.getS3Configuration(); + return new CarbonS3Writer(this, table, null, writePath, partition, configuration); + } + + @Override + protected CarbonTable getTable() throws IOException { + this.setS3Configuration(this.getS3Configuration()); + return super.getTable(); + } + + private Schema getTableSchema(final CarbonTable table) { + final List<CarbonColumn> columnList = table.getCreateOrderColumn(); + final List<ColumnSchema> columnSchemaList = new ArrayList<>(columnList.size()); + for (CarbonColumn column : columnList) { + columnSchemaList.add(column.getColumnSchema()); + } + return new Schema(columnSchemaList); + } + + private Map<String, String> getTableProperties() { + final Properties tableProperties = this.getConfiguration().getTableProperties(); + final Map<String, String> tablePropertyMap = new HashMap<>(tableProperties.size()); + for (String propertyName : tableProperties.stringPropertyNames()) { + tablePropertyMap.put(propertyName, tableProperties.getProperty(propertyName)); + } + return tablePropertyMap; + } + + private org.apache.hadoop.conf.Configuration getS3Configuration() { + final Properties writerProperties = this.getConfiguration().getWriterProperties(); + final String accessKey = writerProperties.getProperty(CarbonS3Property.ACCESS_KEY); + final String secretKey = writerProperties.getProperty(CarbonS3Property.SECRET_KEY); + final String endpoint = writerProperties.getProperty(CarbonS3Property.ENDPOINT); + if (accessKey == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.ACCESS_KEY + "] is not set." + ); + } + if (secretKey == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.SECRET_KEY + "] is not set." + ); + } + if (endpoint == null) { + throw new IllegalArgumentException( + "Writer property [" + CarbonS3Property.ENDPOINT + "] is not set." + ); + } + final org.apache.hadoop.conf.Configuration configuration = + new org.apache.hadoop.conf.Configuration(true); + configuration.set("fs.s3.access.key", accessKey); + configuration.set("fs.s3.secret.key", secretKey); + configuration.set("fs.s3.endpoint", endpoint); + configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + configuration.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + configuration.set("fs.s3a.access.key", accessKey); + configuration.set("fs.s3a.secret.key", secretKey); + configuration.set("fs.s3a.endpoint", endpoint); + configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + configuration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); + return configuration; + } + + private void setS3Configuration(final org.apache.hadoop.conf.Configuration configuration) { + ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration); + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java new file mode 100644 index 0000000..afd9e5e --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.util.Properties; + +public final class CarbonS3WriterFactoryBuilder extends CarbonWriterFactoryBuilder { + + public static final String TYPE = "S3"; + + @Override + public String getType() { + return TYPE; + } + + @Override + public CarbonS3WriterFactory build( + final String databaseName, + final String tableName, + final String tablePath, + final Properties tableProperties, + final Properties writerProperties, + final Properties carbonProperties + ) { + final CarbonS3WriterFactory factory = new CarbonS3WriterFactory(); + factory.setConfiguration( + new ProxyFileWriterFactory.Configuration( + databaseName, + tableName, + tablePath, + tableProperties, + writerProperties, + carbonProperties + ) + ); + return factory; + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java new file mode 100644 index 0000000..d5ddac5 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +/** + * This class is a wrapper of CarbonWriter in SDK. + * It not only write data to carbon with CarbonWriter in SDK, also generate segment file. + */ +public abstract class CarbonWriter extends ProxyFileWriter<String> { + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java new file mode 100644 index 0000000..d3257c9 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.io.IOException; + +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.util.CarbonProperties; + +import org.apache.flink.core.fs.FSDataOutputStream; + +public abstract class CarbonWriterFactory extends ProxyFileWriterFactory<String> { + + public static CarbonWriterFactoryBuilder builder(final String type) { + return CarbonWriterFactoryBuilder.get(type); + } + + @Override + public CarbonWriter create(final FSDataOutputStream out) throws IOException { + if (!(out instanceof ProxyRecoverableOutputStream)) { + throw new IllegalArgumentException( + "Only support " + ProxyRecoverableOutputStream.class.getName() + "." + ); + } + this.setCarbonProperties(); + final CarbonWriter writer = this.create0(); + ((ProxyRecoverableOutputStream) out).bind(writer); + return writer; + } + + @Override + public CarbonWriter create(final String partition) throws IOException { + this.setCarbonProperties(); + return this.create0(partition); + } + + protected abstract CarbonWriter create0() throws IOException; + + protected abstract CarbonWriter create0(String partition) throws IOException; + + protected CarbonTable getTable() throws IOException { + final Configuration configuration = this.getConfiguration(); + return CarbonTable.buildFromTablePath( + configuration.getTableName(), + configuration.getDatabaseName(), + configuration.getTablePath(), + null + ); + } + + private void setCarbonProperties() { + final CarbonProperties carbonProperties = CarbonProperties.getInstance(); + for (String propertyName : + this.getConfiguration().getCarbonProperties().stringPropertyNames()) { + carbonProperties.addProperty( + propertyName, + this.getConfiguration().getCarbonProperties().getProperty(propertyName) + ); + } + } + +} diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java new file mode 100644 index 0000000..28d2225 --- /dev/null +++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.ServiceLoader; + +import org.apache.carbondata.common.logging.LogServiceFactory; + +import org.apache.log4j.Logger; + +public abstract class CarbonWriterFactoryBuilder { + + private static final Logger LOGGER = + LogServiceFactory.getLogService(CarbonWriterFactoryBuilder.class.getName()); + + private static final Map<String, CarbonWriterFactoryBuilder> BUILDER_MAP; + + static { + final Map<String, CarbonWriterFactoryBuilder> builderMap = new HashMap<>(); + final ServiceLoader<CarbonWriterFactoryBuilder> builderLoader = + ServiceLoader.load(CarbonWriterFactoryBuilder.class); + for (CarbonWriterFactoryBuilder builder :builderLoader) { + builderMap.put(builder.getType(), builder); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Added carbon writer factory builder. " + builder.getClass().getName()); + } + } + BUILDER_MAP = Collections.unmodifiableMap(builderMap); + } + + public static CarbonWriterFactoryBuilder get(final String type) { + if (type == null) { + throw new IllegalArgumentException("Argument [type] is null."); + } + CarbonWriterFactoryBuilder builder = BUILDER_MAP.get(type); + if (builder == null) { + if (type.equalsIgnoreCase(CarbonS3WriterFactoryBuilder.TYPE)) { + return new CarbonS3WriterFactoryBuilder(); + } + } + return builder; + } + + public abstract String getType(); + + public abstract CarbonWriterFactory build( + String databaseName, + String tableName, + String tablePath, + Properties tableProperties, + Properties writerProperties, + Properties carbonProperties + ); + +} diff --git a/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder b/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder new file mode 100644 index 0000000..5c8666b --- /dev/null +++ b/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder @@ -0,0 +1,2 @@ +org.apache.carbon.flink.CarbonLocalWriterFactoryBuilder +org.apache.carbon.flink.CarbonS3WriterFactoryBuilder \ No newline at end of file diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala new file mode 100644 index 0000000..55085f8 --- /dev/null +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbon.flink + +import java.util.Properties + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.core.fs.Path +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest + +import org.junit.Test + +class TestCarbonWriter extends QueryTest { + + val tableName = "test_flink" + + @Test + def testLocal(): Unit = { + sql(s"drop table if exists $tableName").collect() + sql( + s""" + | CREATE TABLE $tableName (stringField string, intField int, shortField short) + | STORED AS carbondata + """.stripMargin + ).collect() + + try { + val rootPath = System.getProperty("user.dir") + "/target/test-classes" + + val dataTempPath = rootPath + "/data/temp/" + val dataPath = rootPath + "/data/" + + val tablePath = storeLocation + "/" + tableName + "/" + + val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation) + val carbonProperties = newCarbonProperties(storeLocation) + + val environment = StreamExecutionEnvironment.getExecutionEnvironment + environment.setParallelism(1) + environment.enableCheckpointing(2000L) + environment.setRestartStrategy(RestartStrategies.noRestart) + + val dataCount = 10000 + val source = new TestSource(dataCount) { + @throws[InterruptedException] + override def get(index: Int): String = { + Thread.sleep(1L) + "{\"stringField\": \"test" + index + "\", \"intField\": " + index + ", \"shortField\": 12345}" + } + + @throws[InterruptedException] + override def onFinish(): Unit = { + Thread.sleep(10000L) + } + } + val stream = environment.addSource(source) + val factory = CarbonWriterFactory.builder("Local").build( + "default", + tableName, + tablePath, + new Properties, + writerProperties, + carbonProperties + ) + val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build + + stream.addSink(streamSink) + + try environment.execute + catch { + case exception: Exception => + // TODO + throw new UnsupportedOperationException(exception) + } + + checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0))) + } finally { +// sql(s"drop table if exists $tableName").collect() + } + } + + private def newWriterProperties( + dataTempPath: String, + dataPath: String, + storeLocation: String) = { + val properties = new Properties + properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath) + properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath) + properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation) + properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024") + properties + } + + private def newCarbonProperties(storeLocation: String) = { + val properties = new Properties + properties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + properties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation) + properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024") + properties + } + +} diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala new file mode 100644 index 0000000..88ac173 --- /dev/null +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala @@ -0,0 +1,52 @@ +package org.apache.carbon.flink + +import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction + +abstract class TestSource(val dataCount: Int) extends SourceFunction[String] with CheckpointedFunction { + private var dataIndex = 0 + private var dataIndexState: ListState[Integer] = _ + private var running = false + + @throws[Exception] + def get(index: Int): String + + @throws[Exception] + def onFinish(): Unit = { + // to do nothing. + } + + @throws[Exception] + override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { + this.running = true + while ( { + this.running && this.dataIndex < this.dataCount + }) { + sourceContext.collectWithTimestamp(this.get(this.dataIndex), System.currentTimeMillis) + this.dataIndex += 1 + } + this.onFinish() + } + + override def cancel(): Unit = { + this.running = false + } + + @throws[Exception] + override def snapshotState(context: FunctionSnapshotContext): Unit = { + this.dataIndexState.clear() + this.dataIndexState.add(this.dataIndex) + } + + @throws[Exception] + override def initializeState(context: FunctionInitializationContext): Unit = { + this.dataIndexState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Integer]("dataIndex", classOf[Integer])) + if (!context.isRestored) return + import scala.collection.JavaConversions._ + for (dataIndex <- this.dataIndexState.get) { + this.dataIndex = dataIndex + } + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 9b5f098..e2f5067 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,8 @@ <module>datamap/mv/core</module> <module>examples/spark2</module> <module>integration/hive</module> + <module>integration/flink</module> + <module>integration/flink-proxy</module> <module>integration/presto</module> <module>examples/flink</module> <module>streaming</module> diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index ffad6b1..b1985de 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -272,6 +272,18 @@ public class CarbonWriterBuilder { } /** + * To support the carbon table for sdk writer + * + * @param table carbon table + * @return CarbonWriterBuilder object + */ + public CarbonWriterBuilder withTable(CarbonTable table) { + Objects.requireNonNull(table, "Table should not be null"); + this.carbonTable = table; + return this; + } + + /** * To support the table properties for sdk writer * * @param options key,value pair of create table properties. @@ -646,7 +658,7 @@ public class CarbonWriterBuilder { public CarbonLoadModel buildLoadModel(Schema carbonSchema) throws IOException, InvalidLoadOptionException { - timestamp = System.nanoTime(); + timestamp = System.currentTimeMillis(); // validate long_string_column Set<String> longStringColumns = new HashSet<>(); if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {