Repository: incubator-gobblin Updated Branches: refs/heads/master 43d5ed520 -> 1b7748a68
[GOBBLIN-305] Add csv-kafka kafka-hdfs job template Closes #2160 from zxcware/odsc2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1b7748a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1b7748a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1b7748a6 Branch: refs/heads/master Commit: 1b7748a688ea5b00b0d8bfb6c8e82d32e635482b Parents: 43d5ed5 Author: zhchen <[email protected]> Authored: Wed Nov 8 16:42:46 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Nov 8 16:42:46 2017 -0800 ---------------------------------------------------------------------- .../converter/csv/CsvToJsonConverterV2.java | 69 ++++++++++++++++-- .../extractor/filebased/FileBasedSource.java | 9 +-- .../test/resources/converter/csv/10_fields.json | 16 ++--- .../converter/csv/11_fields_with_null.json | 16 ++--- .../main/resources/templates/csv-kafka.template | 74 ++++++++++++++++++++ .../resources/templates/kafka-hdfs.template | 69 ++++++++++++++++++ 6 files changed, 229 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java index b4829f0..e69f277 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java @@ -30,6 +30,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonNull; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.Converter; @@ -69,6 +70,8 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[], private static final Logger LOG = LoggerFactory.getLogger(CsvToJsonConverterV2.class); private static final JsonParser JSON_PARSER = new JsonParser(); private static final String COLUMN_NAME_KEY = "columnName"; + private static final String DATA_TYPE_KEY = "dataType"; + private static final String TYPE = "type"; private static final String JSON_NULL_VAL = "null"; public static final String CUSTOM_ORDERING = "converter.csv_to_json.custom_order"; @@ -173,12 +176,13 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[], JsonObject outputRecord = new JsonObject(); for (int i = 0; i < outputSchema.size(); i++) { - String key = outputSchema.get(i).getAsJsonObject().get(COLUMN_NAME_KEY).getAsString(); + JsonObject field = outputSchema.get(i).getAsJsonObject(); + String key = field.get(COLUMN_NAME_KEY).getAsString(); if (StringUtils.isEmpty(inputRecord[i]) || JSON_NULL_VAL.equalsIgnoreCase(inputRecord[i])) { outputRecord.add(key, JsonNull.INSTANCE); } else { - outputRecord.addProperty(key, inputRecord[i]); + outputRecord.add(key, convertValue(inputRecord[i], field.getAsJsonObject(DATA_TYPE_KEY))); } } @@ -195,16 +199,73 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[], Iterator<String> customOrderIterator = customOrder.iterator(); while(outputSchemaIterator.hasNext() && customOrderIterator.hasNext()) { - String key = outputSchemaIterator.next().getAsJsonObject().get(COLUMN_NAME_KEY).getAsString(); + JsonObject field = outputSchemaIterator.next().getAsJsonObject(); + String key = field.get(COLUMN_NAME_KEY).getAsString(); int i = Integer.parseInt(customOrderIterator.next()); Preconditions.checkArgument(i < inputRecord.length, "Index out of bound detected in customer order. Index: " + i + " , # of CSV columns: " + inputRecord.length); if (i < 0 || null == inputRecord[i] || JSON_NULL_VAL.equalsIgnoreCase(inputRecord[i])) { outputRecord.add(key, JsonNull.INSTANCE); continue; } - outputRecord.addProperty(key, inputRecord[i]); + outputRecord.add(key, convertValue(inputRecord[i], field.getAsJsonObject(DATA_TYPE_KEY))); } return outputRecord; } + + /** + * Convert string value to the expected type + */ + private JsonElement convertValue(String value, JsonObject dataType) { + if (dataType == null || !dataType.has(TYPE)) { + return new JsonPrimitive(value); + } + + String type = dataType.get(TYPE).getAsString().toUpperCase(); + ValueType valueType = ValueType.valueOf(type); + return valueType.convert(value); + } + + /** + * An enum of type conversions from string value + */ + private enum ValueType { + INT { + @Override + JsonElement convert(String value) { + return new JsonPrimitive(Double.valueOf(value).intValue()); + } + }, + LONG { + @Override + JsonElement convert(String value) { + return new JsonPrimitive(Double.valueOf(value).longValue()); + } + }, + FLOAT { + @Override + JsonElement convert(String value) { + return new JsonPrimitive(Double.valueOf(value).floatValue()); + } + }, + DOUBLE { + @Override + JsonElement convert(String value) { + return new JsonPrimitive(Double.valueOf(value)); + } + }, + BOOLEAN { + @Override + JsonElement convert(String value) { + return new JsonPrimitive(Boolean.valueOf(value)); + } + }, + STRING { + @Override + JsonElement convert(String value) { + return new JsonPrimitive(value); + } + }; + abstract JsonElement convert(String value); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java index a26b052..d693f44 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java @@ -215,10 +215,11 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> { results = this.fsHelper.ls(path); for (int i = 0; i < results.size(); i++) { URI uri = new URI(results.get(i)); - File file = uri.isAbsolute()? - new File(uri) : new File(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY), uri.toString()); - - String filePath = file.getAbsolutePath(); + String filePath = uri.toString(); + if (!uri.isAbsolute()) { + File file = new File(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY), uri.toString()); + filePath = file.getAbsolutePath(); + } results.set(i, filePath + this.splitPattern + this.fsHelper.getFileMTime(filePath)); } } catch (FileBasedHelperException | URISyntaxException e) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/test/resources/converter/csv/10_fields.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/csv/10_fields.json b/gobblin-core/src/test/resources/converter/csv/10_fields.json index c59d524..ccc1349 100644 --- a/gobblin-core/src/test/resources/converter/csv/10_fields.json +++ b/gobblin-core/src/test/resources/converter/csv/10_fields.json @@ -1,12 +1,12 @@ { "Date": "20160924", "DeviceCategory": "desktop", - "Sessions": "42935", - "BounceRate": "0.0446255968324211", - "AvgSessionDuration": "1590.4702457202748", - "Pageviews": "348380", - "PageviewsPerSession": "8.1141260044252945", - "UniquePageviews": "232467", - "AvgTimeOnPage": "206.98603475430664", - "User_count": "33028" + "Sessions": 42935, + "BounceRate": 0.0446255968324211, + "AvgSessionDuration": 1590.4702457202748, + "Pageviews": 348380, + "PageviewsPerSession": 8.1141260044252945, + "UniquePageviews": 232467, + "AvgTimeOnPage": 206.98603475430664, + "User_count": 33028 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json b/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json index 178de16..f4c0637 100644 --- a/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json +++ b/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json @@ -2,12 +2,12 @@ "Date": "20160924", "DeviceCategory": "desktop", "Segment": null, - "Sessions": "42935", - "BounceRate": "0.0446255968324211", - "AvgSessionDuration": "1590.4702457202748", - "Pageviews": "348380", - "PageviewsPerSession": "8.1141260044252945", - "UniquePageviews": "232467", - "AvgTimeOnPage": "206.98603475430664", - "User_count": "33028" + "Sessions": 42935, + "BounceRate": 0.0446255968324211, + "AvgSessionDuration": 1590.4702457202748, + "Pageviews": 348380, + "PageviewsPerSession": 8.1141260044252945, + "UniquePageviews": 232467, + "AvgTimeOnPage": 206.98603475430664, + "User_count": 33028 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-runtime/src/main/resources/templates/csv-kafka.template ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/resources/templates/csv-kafka.template b/gobblin-runtime/src/main/resources/templates/csv-kafka.template new file mode 100644 index 0000000..d85ce1a --- /dev/null +++ b/gobblin-runtime/src/main/resources/templates/csv-kafka.template @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# ==================================================================== +# Job configurations +# ==================================================================== + +# Required configuration constraints +gobblin.template.required_attributes="input.fs.uri,input.dir,job.work.dir,state.store.fs.uri,writer.fs.uri,csv.schema.json,csv.kafka.brokers,csv.kafka.topic" + +# Job info +job.name=CsvKafka +job.lock.enabled=false + +# Gobblin data storage configurations +state.store.dir=${job.work.dir}/state-store +task.data.root.dir=${job.work.dir}/task-data +writer.staging.dir=${job.work.dir}/task-staging +writer.output.dir=${job.work.dir}/task-output +data.publisher.final.dir=${job.work.dir}/job-output + +# Gobblin MapReduce configurations +mr.job.root.dir=${job.work.dir}/working +mr.job.lock.dir=${job.work.dir}/locks + +# Source +source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource +source.filebased.downloader.class=org.apache.gobblin.source.extractor.filebased.CsvFileDownloader +source.schema=${csv.schema.json} + +## Skip header +source.skip.first.record=true +source.filebased.fs.uri=${input.fs.uri} +source.filebased.data.directory=${input.dir} +source.filebased.maxFilesPerRun=1 +source.max.number.of.partitions=3 + +## Extract +extract.namespace=data +extract.table.name=csv +extract.table.type=SNAPSHOT_APPEND + +# Task execution configurations +taskexecutor.threadpool.size=4 +taskretry.threadpool.coresize=1 +taskretry.threadpool.maxsize=1 + +# Converter +converter.classes=org.apache.gobblin.converter.csv.CsvToJsonConverterV2 + +# Writer +writer.destination.type=KAFKA +writer.output.format=json +writer.builder.class=org.apache.gobblin.kafka.writer.Kafka09JsonObjectWriterBuilder + +writer.kafka.topic=${csv.kafka.topic} +writer.kafka.producerConfig.value.serializer=org.apache.kafka.common.serialization.StringSerializer +writer.kafka.producerConfig.bootstrap.servers=${csv.kafka.brokers} +writer.kafka.producerConfig.retries=3 +writer.kafka.producerConfig.client.id=CsvKafka \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template new file mode 100644 index 0000000..773f848 --- /dev/null +++ b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# ==================================================================== +# Job configurations +# ==================================================================== + +# Required configuration constraints +gobblin.template.required_attributes="job.work.dir,state.store.fs.uri,writer.fs.uri,kafka.schema.json,kafka.brokers,kafka.topics" + +# Job info +job.name=KafkaHDFS +job.lock.enabled=false + +# Gobblin data storage configurations +state.store.dir=${job.work.dir}/state-store +task.data.root.dir=${job.work.dir}/task-data +writer.staging.dir=${job.work.dir}/task-staging +writer.output.dir=${job.work.dir}/task-output +data.publisher.final.dir=${job.work.dir}/job-output + +# Gobblin MapReduce configurations +mr.job.root.dir=${job.work.dir}/working +mr.job.lock.dir=${job.work.dir}/locks + +# Source +source.class=org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource +source.kafka.json.schema=${kafka.schema.json} + +topic.whitelist=${kafka.topics} +org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory" +bootstrap.with.offset=earliest +kafka.workunit.packer.type=SINGLE_LEVEL +mr.job.max.mappers=10 + +## Extract +extract.namespace=kafka +extract.table.type=SNAPSHOT_APPEND +extract.limit.type=time +extract.limit.enabled=true +extract.limit.timeLimit=5 +extract.limit.timeLimitTimeunit=minutes + +# Task execution configurations +taskexecutor.threadpool.size=4 +taskretry.threadpool.coresize=1 +taskretry.threadpool.maxsize=1 + +# Converter +converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter + +# Writer +writer.destination.type=HDFS +writer.output.format=AVRO +writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder \ No newline at end of file
