Repository: incubator-gobblin Updated Branches: refs/heads/master 278b48d41 -> 90d8495ae
[GOBBLIN-296] Kafka json source and writer Closes #2150 from zxcware/odsc Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/90d8495a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/90d8495a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/90d8495a Branch: refs/heads/master Commit: 90d8495ae5b057962a500eaaa78b52a02e07adbe Parents: 278b48d Author: zhchen <[email protected]> Authored: Fri Oct 27 10:50:37 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Oct 27 10:50:37 2017 -0700 ---------------------------------------------------------------------- .../kafka/writer/KafkaDataWriterBuilder.java | 11 +- .../extract/kafka/KafkaGsonDeserializer.java | 20 +-- .../kafka/client/Kafka09ConsumerClient.java | 34 ++--- .../gobblin/kafka/writer/Kafka09DataWriter.java | 13 +- .../writer/Kafka09JsonObjectWriterBuilder.java | 49 +++++++ .../kafka/writer/KafkaDataWriterBuilder.java | 3 +- .../extract/kafka/Kafka09JsonSource.java | 90 ++++++++++++ .../kafka/Kafka09JsonIntegrationTest.java | 140 +++++++++++++++++++ .../kafka/serialize/GsonDeserializerBase.java | 44 ++++++ .../kafka/serialize/GsonSerializerBase.java | 46 ++++++ .../writer/AbstractKafkaDataWriterBuilder.java | 68 +++++++++ .../writer/BaseKafkaDataWriterBuilder.java | 50 +------ .../gobblin/kafka/writer/KafkaWriterHelper.java | 22 +-- .../util/MultiWorkUnitUnpackingIterator.java | 54 +++++-- 14 files changed, 518 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java index 2d8aafc..7adbba1 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java @@ -17,29 +17,20 @@ package org.apache.gobblin.kafka.writer; -import java.io.IOException; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import com.typesafe.config.Config; - -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.writer.AsyncDataWriter; -import org.apache.gobblin.writer.DataWriter; -import org.apache.gobblin.writer.DataWriterBuilder; -import org.apache.gobblin.writer.PartitionAwareDataWriterBuilder; /** * Builder that hands back a {@link Kafka08DataWriter} */ -public class KafkaDataWriterBuilder extends BaseKafkaDataWriterBuilder { +public class KafkaDataWriterBuilder extends AbstractKafkaDataWriterBuilder<Schema, GenericRecord> { @Override protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties props) { return new Kafka08DataWriter<>(props); } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java index e297d96..6556644 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java @@ -19,7 +19,6 @@ package org.apache.gobblin.source.extractor.extract.kafka; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Map; import org.apache.kafka.common.serialization.Deserializer; @@ -27,30 +26,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.JsonElement; +import org.apache.gobblin.kafka.serialize.GsonDeserializerBase; + /** * Implementation of {@link Deserializer} that deserializes Kafka data into a {@link JsonElement} using the * {@link StandardCharsets#UTF_8} encoding. */ -public class KafkaGsonDeserializer implements Deserializer<JsonElement> { +public class KafkaGsonDeserializer extends GsonDeserializerBase<JsonElement> implements Deserializer<JsonElement> { private static final Gson GSON = new Gson(); @VisibleForTesting static final Charset CHARSET = StandardCharsets.UTF_8; - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - // Do nothing - } - - @Override - public JsonElement deserialize(String topic, byte[] data) { - return GSON.fromJson(new String(data, CHARSET), JsonElement.class); - } - - @Override - public void close() { - // Do nothing - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java index 9b952ab..4943ac5 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java @@ -22,12 +22,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Properties; -import java.util.regex.Pattern; - -import javax.annotation.Nonnull; - -import lombok.EqualsAndHashCode; -import lombok.ToString; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -39,17 +33,21 @@ import org.apache.kafka.common.TopicPartition; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import javax.annotation.Nonnull; +import lombok.EqualsAndHashCode; +import lombok.ToString; import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.DatasetFilterUtils; /** @@ -66,38 +64,40 @@ public class Kafka09ConsumerClient<K, V> extends AbstractBaseKafkaConsumerClient private static final String KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY = "session.timeout.ms"; private static final String KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY = "key.deserializer"; private static final String KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY = "value.deserializer"; + private static final String KAFKA_09_CLIENT_GROUP_ID = "group.id"; private static final String KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT = Boolean.toString(false); private static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String KAFKA_09_DEFAULT_GROUP_ID = "kafka09"; public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX + KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY; public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = CONFIG_PREFIX + KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY; + private static final Config FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT) + .put(KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER) + .put(KAFKA_09_CLIENT_GROUP_ID, KAFKA_09_DEFAULT_GROUP_ID) + .build()); + private final Consumer<K, V> consumer; private Kafka09ConsumerClient(Config config) { super(config); Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), - "Missing required property " + GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY); - - Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, AbstractBaseKafkaConsumerClient.CONFIG_PREFIX_NO_DOT); + "Missing required property " + GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY); Properties props = new Properties(); props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, Joiner.on(",").join(super.brokers)); - props.put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT); props.put(KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis); - props.put(KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY, - ConfigUtils.getString(config, GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER)); - props.put(KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY, - config.getString(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY)); + Config scopedConfig = config.getConfig(CONFIG_PREFIX_NO_DOT).withFallback(FALLBACK); props.putAll(ConfigUtils.configToProperties(scopedConfig)); this.consumer = new KafkaConsumer<>(props); - } public Kafka09ConsumerClient(Config config, Consumer<K, V> consumer) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java index 40a8a4c..2cb00e1 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java @@ -76,15 +76,14 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> { private final Producer<String, D> producer; private final String topic; - public static Producer getKafkaProducer(Properties props) - { + public static Producer getKafkaProducer(Properties props) { Object producerObject = KafkaWriterHelper.getKafkaProducer(props); - try - { + try { Producer producer = (Producer) producerObject; return producer; } catch (ClassCastException e) { - log.error("Failed to instantiate Kafka producer " + producerObject.getClass().getName() + " as instance of Producer.class", e); + log.error("Failed to instantiate Kafka producer " + producerObject.getClass().getName() + + " as instance of Producer.class", e); throw Throwables.propagate(e); } } @@ -93,8 +92,7 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> { this(getKafkaProducer(props), ConfigFactory.parseProperties(props)); } - public Kafka09DataWriter(Producer producer, Config config) - { + public Kafka09DataWriter(Producer producer, Config config) { this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC); this.producer = producer; } @@ -106,7 +104,6 @@ public class Kafka09DataWriter<D> implements AsyncDataWriter<D> { this.producer.close(); } - @Override public Future<WriteResponse> write(final D record, final WriteCallback callback) { return new WriteResponseFuture<>(this.producer.send(new ProducerRecord<String, D>(topic, record), new Callback() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java new file mode 100644 index 0000000..c97c486 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.writer; + +import java.util.Properties; + +import org.apache.gobblin.kafka.serialize.GsonSerializerBase; +import org.apache.gobblin.writer.AsyncDataWriter; +import org.apache.kafka.common.serialization.Serializer; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + + +/** + * A {@link org.apache.gobblin.writer.DataWriterBuilder} that builds a {@link org.apache.gobblin.writer.DataWriter} to + * write {@link JsonObject} to kafka + */ +public class Kafka09JsonObjectWriterBuilder extends AbstractKafkaDataWriterBuilder<JsonArray, JsonObject> { + private static final String VALUE_SERIALIZER_KEY = + KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG; + + @Override + protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props) { + props.setProperty(VALUE_SERIALIZER_KEY, KafkaGsonObjectSerializer.class.getName()); + return new Kafka09DataWriter<>(props); + } + + /** + * A specific {@link Serializer} that serializes {@link JsonObject} to byte array + */ + public final static class KafkaGsonObjectSerializer extends GsonSerializerBase<JsonObject> implements Serializer<JsonObject> { + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java index 754f5a4..8869c43 100644 --- a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java @@ -19,6 +19,7 @@ package org.apache.gobblin.kafka.writer; import java.util.Properties; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.gobblin.writer.AsyncDataWriter; @@ -27,7 +28,7 @@ import org.apache.gobblin.writer.AsyncDataWriter; /** * Builder that hands back a {@link Kafka09DataWriter} */ -public class KafkaDataWriterBuilder extends BaseKafkaDataWriterBuilder { +public class KafkaDataWriterBuilder extends AbstractKafkaDataWriterBuilder<Schema, GenericRecord> { @Override protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties props) { return new Kafka09DataWriter<>(props); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java new file mode 100644 index 0000000..772ade7 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.source.extractor.extract.kafka; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.kafka.common.serialization.Deserializer; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord; +import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; +import org.apache.gobblin.kafka.serialize.GsonDeserializerBase; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; + + +/** + * A {@link KafkaSource} that reads kafka record as {@link JsonObject} + */ +public class Kafka09JsonSource extends KafkaSource<JsonArray, JsonObject> { + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + if (!state.contains(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY)) { + state.setProp(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, + KafkaGsonDeserializer.class.getName()); + } + return super.getWorkunits(state); + } + + @Override + public Extractor<JsonArray, JsonObject> getExtractor(WorkUnitState state) + throws IOException { + return new JsonExtractor(state); + } + + static final class JsonExtractor extends KafkaExtractor<JsonArray, JsonObject> { + private static final String JSON_SCHEMA = "source.kafka.json.schema"; + private static final JsonParser JSON_PARSER = new JsonParser(); + private final JsonArray schema; + + JsonExtractor(WorkUnitState state) { + super(state); + String schemaStr = state.getProp(JSON_SCHEMA); + if (StringUtils.isEmpty(schemaStr)) { + throw new RuntimeException("Missing configuration: " + JSON_SCHEMA); + } + this.schema = JSON_PARSER.parse(schemaStr).getAsJsonArray(); + } + + @Override + public JsonArray getSchema() + throws IOException { + return schema; + } + + @Override + protected JsonObject decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) + throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** + * A specific kafka {@link Deserializer} that deserializes record as JasonObject + */ + public static final class KafkaGsonDeserializer extends GsonDeserializerBase<JsonObject> implements Deserializer<JsonObject> { + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java new file mode 100644 index 0000000..5cbd4ea --- /dev/null +++ b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.List; + +import org.testng.Assert; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.kafka.client.Kafka09ConsumerClient; +import org.apache.gobblin.kafka.writer.Kafka09JsonObjectWriterBuilder; +import org.apache.gobblin.runtime.util.MultiWorkUnitUnpackingIterator; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.Destination; + +import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX; +import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.KAFKA_TOPIC; + + +/** + * An integration test for {@link Kafka09JsonSource} and {@link Kafka09JsonObjectWriterBuilder}. The test writes + * a json object to kafka with the writer and extracts it with the source + */ +@Slf4j +public class Kafka09JsonIntegrationTest { + private final Gson gson; + private final KafkaTestBase kafkaTestHelper; + + public Kafka09JsonIntegrationTest() + throws InterruptedException, RuntimeException { + kafkaTestHelper = new KafkaTestBase(); + gson = new Gson(); + } + + @BeforeSuite + public void beforeSuite() { + log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName()); + kafkaTestHelper.startServers(); + } + + @AfterSuite + public void afterSuite() + throws IOException { + try { + kafkaTestHelper.stopClients(); + } finally { + kafkaTestHelper.stopServers(); + } + } + + private SourceState createSourceState(String topic) { + SourceState state = new SourceState(); + state.setProp(ConfigurationKeys.KAFKA_BROKERS, "localhost:" + kafkaTestHelper.getKafkaServerPort()); + state.setProp(KafkaSource.TOPIC_WHITELIST, topic); + state.setProp(KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS, + Kafka09ConsumerClient.Factory.class.getName()); + state.setProp(KafkaSource.BOOTSTRAP_WITH_OFFSET, "earliest"); + return state; + } + + @Test + public void testHappyPath() + throws IOException, DataRecordException { + String topic = "testKafka09JsonSource"; + kafkaTestHelper.provisionTopic(topic); + SourceState state = createSourceState(topic); + + // Produce a record + state.setProp(KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers", + "localhost:" + kafkaTestHelper.getKafkaServerPort()); + state.setProp(KAFKA_TOPIC, topic); + Destination destination = Destination.of(Destination.DestinationType.KAFKA, state); + Kafka09JsonObjectWriterBuilder writerBuilder = new Kafka09JsonObjectWriterBuilder(); + writerBuilder.writeTo(destination); + DataWriter<JsonObject> writer = writerBuilder.build(); + + final String json = "{\"number\":27}"; + JsonObject record = gson.fromJson(json, JsonObject.class); + writer.write(record); + writer.flush(); + writer.close(); + + Kafka09JsonSource source = new Kafka09JsonSource(); + List<WorkUnit> workUnitList = source.getWorkunits(state); + // Test the right value serializer is set + Assert.assertEquals(state.getProp(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY), + Kafka09JsonSource.KafkaGsonDeserializer.class.getName()); + + // Test there is only one non-empty work unit + MultiWorkUnitUnpackingIterator iterator = new MultiWorkUnitUnpackingIterator(workUnitList.iterator()); + Assert.assertTrue(iterator.hasNext()); + WorkUnit workUnit = iterator.next(); + Assert.assertEquals(workUnit.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY), topic); + Assert.assertFalse(iterator.hasNext()); + + // Test extractor + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + + final String jsonSchema = + "[{\"columnName\":\"number\",\"comment\":\"\",\"isNullable\":\"false\",\"dataType\":{\"type\":\"int\"}}]"; + workUnitState.setProp("source.kafka.json.schema", jsonSchema); + + Extractor<JsonArray, JsonObject> extractor = source.getExtractor(workUnitState); + Assert.assertEquals(extractor.getSchema().toString(), jsonSchema); + Assert.assertEquals(extractor.readRecord(null).toString(), json); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java new file mode 100644 index 0000000..58c59ee --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.serialize; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; + + +/** + * Base kafka Gson deserializer, which deserializes a json string to a {@link JsonElement} + */ +public class GsonDeserializerBase<T extends JsonElement> { + private static final Gson GSON = new Gson(); + + public void configure(Map<String, ?> configs, boolean isKey) { + // Do nothing + } + + public T deserialize(String topic, byte[] data) { + return (T) GSON.fromJson(new String(data, StandardCharsets.UTF_8), JsonElement.class); + } + + public void close() { + // Do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java new file mode 100644 index 0000000..23c4535 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.serialize; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import com.google.gson.JsonElement; + + +/** + * Base kafka GSON serializer, which serializes json data into string encoded with + * {@link StandardCharsets#UTF_8} + */ +public class GsonSerializerBase<T extends JsonElement> { + public void configure(Map<String, ?> configs, boolean isKey) { + // Do nothing by default + } + + public byte[] serialize(String topic, T data) { + if (data == null) { + return null; + } else { + return data.toString().getBytes(StandardCharsets.UTF_8); + } + } + + public void close() { + // Nothing to close + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java new file mode 100644 index 0000000..a3db916 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.kafka.writer; + +import java.io.IOException; +import java.util.Properties; + +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.writer.AsyncDataWriter; +import org.apache.gobblin.writer.AsyncWriterManager; +import org.apache.gobblin.writer.DataWriter; +import org.apache.gobblin.writer.DataWriterBuilder; + + +/** + * Base kafka data writer builder. It builds an async kafka {@link DataWriter} + */ +public abstract class AbstractKafkaDataWriterBuilder<S, D> extends DataWriterBuilder<S, D> { + + protected abstract AsyncDataWriter<D> getAsyncDataWriter(Properties props); + + /** + * Build a {@link DataWriter}. + * + * @throws IOException if there is anything wrong building the writer + * @return the built {@link DataWriter} + */ + @Override + public DataWriter<D> build() + throws IOException { + State state = this.destination.getProperties(); + Properties taskProps = state.getProperties(); + Config config = ConfigUtils.propertiesToConfig(taskProps); + long commitTimeoutMillis = ConfigUtils.getLong(config, KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_CONFIG, + KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_DEFAULT); + long commitStepWaitTimeMillis = ConfigUtils.getLong(config, KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_CONFIG, + KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_DEFAULT); + double failureAllowance = ConfigUtils.getDouble(config, KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG, + KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0; + + return AsyncWriterManager.builder() + .config(config) + .commitTimeoutMillis(commitTimeoutMillis) + .commitStepWaitTimeInMillis(commitStepWaitTimeMillis) + .failureAllowanceRatio(failureAllowance) + .retriesEnabled(false) + .asyncDataWriter(getAsyncDataWriter(taskProps)) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java index 8e5e9b0..c5b2f95 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java @@ -17,57 +17,15 @@ package org.apache.gobblin.kafka.writer; -import java.io.IOException; -import java.util.Properties; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import com.typesafe.config.Config; - -import org.apache.gobblin.configuration.State; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.writer.AsyncWriterManager; -import org.apache.gobblin.writer.AsyncDataWriter; -import org.apache.gobblin.writer.DataWriter; -import org.apache.gobblin.writer.DataWriterBuilder; - /** * Base class for creating KafkaDataWriter builders. + * + * @deprecated Use {@link AbstractKafkaDataWriterBuilder} */ - -public abstract class BaseKafkaDataWriterBuilder extends DataWriterBuilder<Schema, GenericRecord> { - - protected abstract AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties props); - - /** - * Build a {@link DataWriter}. - * - * @throws IOException if there is anything wrong building the writer - * @return the built {@link DataWriter} - */ - @Override - public DataWriter<GenericRecord> build() - throws IOException { - State state = this.destination.getProperties(); - Properties taskProps = state.getProperties(); - Config config = ConfigUtils.propertiesToConfig(taskProps); - long commitTimeoutMillis = ConfigUtils.getLong(config, KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_CONFIG, - KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_DEFAULT); - long commitStepWaitTimeMillis = ConfigUtils.getLong(config, KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_CONFIG, - KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_DEFAULT); - double failureAllowance = ConfigUtils.getDouble(config, KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG, - KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0; - - return AsyncWriterManager.builder() - .config(config) - .commitTimeoutMillis(commitTimeoutMillis) - .commitStepWaitTimeInMillis(commitStepWaitTimeMillis) - .failureAllowanceRatio(failureAllowance) - .retriesEnabled(false) - .asyncDataWriter(getAsyncDataWriter(taskProps)) - .build(); - } - +@Deprecated +public abstract class BaseKafkaDataWriterBuilder extends AbstractKafkaDataWriterBuilder<Schema, GenericRecord> { } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java index 3f52645..28da311 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java @@ -40,8 +40,7 @@ import static org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.CLIEN @Slf4j public class KafkaWriterHelper { - static Properties getProducerProperties(Properties props) - { + static Properties getProducerProperties(Properties props) { Properties producerProperties = stripPrefix(props, KAFKA_PRODUCER_CONFIG_PREFIX); // Provide default properties if not set from above @@ -52,8 +51,7 @@ public class KafkaWriterHelper { return producerProperties; } - private static void setDefaultIfUnset(Properties props, String key, String value) - { + private static void setDefaultIfUnset(Properties props, String key, String value) { if (!props.containsKey(key)) { props.setProperty(key, value); } @@ -62,22 +60,18 @@ public class KafkaWriterHelper { private static Properties stripPrefix(Properties props, String prefix) { Properties strippedProps = new Properties(); int prefixLength = prefix.length(); - for (String key: props.stringPropertyNames()) - { - if (key.startsWith(prefix)) - { + for (String key : props.stringPropertyNames()) { + if (key.startsWith(prefix)) { strippedProps.setProperty(key.substring(prefixLength), props.getProperty(key)); } } return strippedProps; } - public static Object getKafkaProducer(Properties props) - { + public static Object getKafkaProducer(Properties props) { Config config = ConfigFactory.parseProperties(props); - String kafkaProducerClass = ConfigUtils - .getString(config, KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS, - KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS_DEFAULT); + String kafkaProducerClass = ConfigUtils.getString(config, KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS, + KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS_DEFAULT); Properties producerProps = getProducerProperties(props); try { Class<?> producerClass = (Class<?>) Class.forName(kafkaProducerClass); @@ -88,6 +82,4 @@ public class KafkaWriterHelper { throw Throwables.propagate(e); } } - - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java index 38e4ce7..348fb28 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java @@ -31,29 +31,59 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class MultiWorkUnitUnpackingIterator implements Iterator<WorkUnit> { private final Iterator<WorkUnit> workUnits; + + /** The iterator for {@link #nextWu} if it's a {@link MultiWorkUnit} */ private Iterator<WorkUnit> currentIterator; + /** The work unit to be checked in {@link #next()} */ + private WorkUnit nextWu; + /** A flag indicating if a new seek operation will be needed */ + private boolean needSeek = true; @Override public boolean hasNext() { - return this.workUnits.hasNext() || (this.currentIterator != null && this.currentIterator.hasNext()); + seekNext(); + return nextWu != null; } @Override public WorkUnit next() { + // In case, the caller forgets to call hasNext() + seekNext(); + + WorkUnit wu = nextWu; + if (nextWu instanceof MultiWorkUnit) { + wu = this.currentIterator.next(); + } + needSeek = true; + return wu; + } + + /** Seek to the next available work unit, skipping all empty work units */ + private void seekNext() { + if (!needSeek) { + return; + } + + // First, iterate all if (this.currentIterator != null && this.currentIterator.hasNext()) { - WorkUnit next = this.currentIterator.next(); - if (next instanceof MultiWorkUnit) { - throw new IllegalStateException("A MultiWorkUnit cannot contain other MultiWorkUnits."); - } - return next; + needSeek = false; + return; } - WorkUnit wu = this.workUnits.next(); - if (wu instanceof MultiWorkUnit) { - this.currentIterator = ((MultiWorkUnit) wu).getWorkUnits().iterator(); - return next(); - } else { - return wu; + + // Then, find the next available work unit + nextWu = null; + this.currentIterator = null; + while (nextWu == null && workUnits.hasNext()) { + nextWu = workUnits.next(); + if (nextWu instanceof MultiWorkUnit) { + this.currentIterator = ((MultiWorkUnit) nextWu).getWorkUnits().iterator(); + if (!this.currentIterator.hasNext()) { + nextWu = null; + } + } } + + needSeek = false; } @Override
