Repository: carbondata Updated Branches: refs/heads/master fae457a35 -> 242c08be4
[CARBONDATA-2384] SDK support write/read data into/from S3 User can set his credential in SDK and use SDK to write data into S3 and read data from S3. This closes #2226 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/242c08be Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/242c08be Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/242c08be Branch: refs/heads/master Commit: 242c08be452ada59397ceaa906a568b35825b5f3 Parents: fae457a Author: xubo245 <[email protected]> Authored: Wed Apr 25 15:09:44 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Fri Apr 27 18:16:22 2018 +0800 ---------------------------------------------------------------------- .../carbondata/examples/sdk/SDKS3Example.java | 103 +++++++++++++++++++ store/sdk/pom.xml | 5 + .../sdk/file/CarbonReaderBuilder.java | 69 +++++++++++++ .../sdk/file/CarbonWriterBuilder.java | 69 +++++++++++++ 4 files changed, 246 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java new file mode 100644 index 0000000..60aa1f8 --- /dev/null +++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples.sdk; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.sdk.file.*; + +/** + * Example for testing CarbonWriter on S3 + */ +public class SDKS3Example { + public static void main(String[] args) throws Exception { + LogService logger = LogServiceFactory.getLogService(SDKS3Example.class.getName()); + if (args == null || args.length < 3) { + logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>" + + "<s3-endpoint> [table-path-on-s3] [persistSchema] [transactionalTable]"); + System.exit(0); + } + + String path = "s3a://sdk/WriterOutput"; + if (args.length > 3) { + path=args[3]; + } + + int num = 3; + if (args.length > 4) { + num = Integer.parseInt(args[4]); + } + + Boolean persistSchema = true; + if (args.length > 5) { + if (args[5].equalsIgnoreCase("true")) { + persistSchema = true; + } else { + persistSchema = false; + } + } + + Boolean transactionalTable = true; + if (args.length > 6) { + if (args[6].equalsIgnoreCase("true")) { + transactionalTable = true; + } else { + transactionalTable = false; + } + } + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + CarbonWriterBuilder builder = CarbonWriter.builder() + .withSchema(new Schema(fields)) + .setAccessKey(args[0]) + .setSecretKey(args[1]) + .setEndPoint(args[2]) + .outputPath(path) + .persistSchemaFile(persistSchema) + .isTransactionalTable(transactionalTable); + + CarbonWriter writer = builder.buildWriterForCSVInput(); + + for (int i = 0; i < num; i++) { + writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)}); + } + writer.close(); + // Read data + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .projection(new String[]{"name", "age"}) + .setAccessKey(args[0]) + .setSecretKey(args[1]) + .setEndPoint(args[2]) + .build(); + + System.out.println("\nData:"); + int i = 0; + while (i < 20 && reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + System.out.println(row[0] + " " + row[1]); + i++; + } + System.out.println("\nFinished"); + // TODO + // reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index a7869e4..af0d079 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -40,6 +40,11 @@ <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>${hadoop.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 7f00b49..9560ef7 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -24,12 +24,14 @@ import java.util.Objects; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.api.CarbonFileInputFormat; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; @@ -64,6 +66,73 @@ public class CarbonReaderBuilder { return this; } + /** + * Set the access key for S3 + * + * @param key the string of access key for different S3 type,like: fs.s3a.access.key + * @param value the value of access key + * @return CarbonWriterBuilder + */ + public CarbonReaderBuilder setAccessKey(String key, String value) { + FileFactory.getConfiguration().set(key, value); + return this; + } + + /** + * Set the access key for S3. + * + * @param value the value of access key + * @return CarbonWriterBuilder + */ + public CarbonReaderBuilder setAccessKey(String value) { + return setAccessKey(Constants.ACCESS_KEY, value); + } + + /** + * Set the secret key for S3 + * + * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key + * @param value the value of secret key + * @return CarbonWriterBuilder + */ + public CarbonReaderBuilder setSecretKey(String key, String value) { + FileFactory.getConfiguration().set(key, value); + return this; + } + + /** + * Set the secret key for S3 + * + * @param value the value of secret key + * @return CarbonWriterBuilder + */ + public CarbonReaderBuilder setSecretKey(String value) { + return setSecretKey(Constants.SECRET_KEY, value); + } + + /** + * Set the endpoint for S3 + * + * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint + * @param value the value of endpoint + * @return CarbonWriterBuilder + */ + public CarbonReaderBuilder setEndPoint(String key, String value) { + FileFactory.getConfiguration().set(key, value); + return this; + } + + /** + * Set the endpoint for S3 + * + * @param value the value of endpoint + * @return CarbonWriterBuilder + */ + public CarbonReaderBuilder setEndPoint(String value) { + FileFactory.getConfiguration().set(Constants.ENDPOINT, value); + return this; + } + public <T> CarbonReader<T> build() throws IOException, InterruptedException { CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath); http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java index 3e5f814..5f5ee6f 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java @@ -44,6 +44,8 @@ import org.apache.carbondata.core.writer.ThriftWriter; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder; +import org.apache.hadoop.fs.s3a.Constants; + /** * Biulder for {@link CarbonWriter} */ @@ -131,6 +133,73 @@ public class CarbonWriterBuilder { } /** + * Set the access key for S3 + * + * @param key the string of access key for different S3 type,like: fs.s3a.access.key + * @param value the value of access key + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder setAccessKey(String key, String value) { + FileFactory.getConfiguration().set(key, value); + return this; + } + + /** + * Set the access key for S3. + * + * @param value the value of access key + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder setAccessKey(String value) { + return setAccessKey(Constants.ACCESS_KEY, value); + } + + /** + * Set the secret key for S3 + * + * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key + * @param value the value of secret key + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder setSecretKey(String key, String value) { + FileFactory.getConfiguration().set(key, value); + return this; + } + + /** + * Set the secret key for S3 + * + * @param value the value of secret key + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder setSecretKey(String value) { + return setSecretKey(Constants.SECRET_KEY, value); + } + + /** + * Set the endpoint for S3 + * + * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint + * @param value the value of endpoint + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder setEndPoint(String key, String value) { + FileFactory.getConfiguration().set(key, value); + return this; + } + + /** + * Set the endpoint for S3 + * + * @param value the value of endpoint + * @return CarbonWriterBuilder + */ + public CarbonWriterBuilder setEndPoint(String value) { + FileFactory.getConfiguration().set(Constants.ENDPOINT, value); + return this; + } + + /** * to set the timestamp in the carbondata and carbonindex index files * @param UUID is a timestamp to be used in the carbondata and carbonindex index files * @return updated CarbonWriterBuilder
