This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 665e14e [GOBBLIN-1521] Create local mode of streaming kafka job to
help user quickly onboard (#3372)
665e14e is described below
commit 665e14efd1b08b604bb666b4a89136bfca2e09d8
Author: Zihan Li <[email protected]>
AuthorDate: Mon Oct 18 15:53:55 2021 -0700
[GOBBLIN-1521] Create local mode of streaming kafka job to help user
quickly onboard (#3372)
* [GOBBLIN-1521] Create local mode of streaming kafka job to help user
quickly onboard
* remove intended change
* update the document
* address comments
* address comments
---
.../Run-Gobblin-Streaming-kafka-hdfs-Locally.md | 31 +++++++++++
.../gobblin_jobs/kafka-hdfs-streaming-avro.conf | 5 +-
.../kafka/client/Kafka09ConsumerClient.java | 24 ++++++++-
.../gobblin/kafka/KafkaStreamingLocalTest.java | 51 ++++++++++++++++++
.../src/test/resources/kafkaHdfsStreaming.conf} | 62 ++++++++++++----------
.../extract/kafka/KafkaStreamingExtractor.java | 6 ++-
6 files changed, 145 insertions(+), 34 deletions(-)
diff --git
a/gobblin-docs/user-guide/Run-Gobblin-Streaming-kafka-hdfs-Locally.md
b/gobblin-docs/user-guide/Run-Gobblin-Streaming-kafka-hdfs-Locally.md
new file mode 100644
index 0000000..0dd977b
--- /dev/null
+++ b/gobblin-docs/user-guide/Run-Gobblin-Streaming-kafka-hdfs-Locally.md
@@ -0,0 +1,31 @@
+# Table of Contents
+
+[TOC]
+
+# Introduction
+
+Gobblin supports streaming mode that allows continuous ingestion of data from
Kafka to HDFS. The streaming mode has been deployed in production at LinkedIn
as a Gobblin cluster that uses Yarn for container allocation and Helix for task
coordination.
+
+Here, we describe how to set up a Kafka -> HDFS pipeline in local mode for
users to easily start and test out a streaming ingestion pipeline.
+
+
+# Setup local kafka cluster
+
+Follow [kafka quick start](https://kafka.apache.org/quickstart) to set up your
kafka cluster, and create test topic "testEvents"
+
+# Run EmbeddedGobblin to start the job
+
+We use the configuration:
/gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHDFSStreaming.conf to
execute the job.
+
+To run the job, in your intellij, you can run the test in
/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest
+by removing the line '(enabled=false)'. In order to run the test in IDE, you
may need to manually delete log4j-over-slf4j jar in IDE
+
+Under your kafka dir, you can run following command to produce data into your
kafka topic
+
+`bin/kafka-console-producer.sh --topic testEvents --bootstrap-server
localhost:9092`
+
+The job will continually consume from testEvents and write out data as txt
file onto your local fileSystem (/tmp/gobblin/kafka/publish). It will write put
data every 60 seconds, and will never end until
+you manually kill it.
+
+If you want the job ingest data as avro/orc, you will need to have schema
registry as schema source and change the job configuration to control the
behavior, a sample configuration can be found
[here](https://github.com/apache/gobblin/blob/master/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf)
+
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
index 71c64dc..631ba3c 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
+++
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
@@ -14,7 +14,7 @@ fork.record.queue.capacity=1
# Streaming-source specific configurations
source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource
-gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor
+gobblin.source.kafka.extractorType=org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
kafka.workunit.size.estimator.type=CUSTOM
kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator
kafka.workunit.packer.type=CUSTOM
@@ -27,9 +27,10 @@ streaming.watermark.commitIntervalMillis=2000
# Converter configs
# Default Generic Record based pipeline
-recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter"
+recordStreamProcessor.classes="org.apache.gobblin.converter.GenericRecordBasedKafkaSchemaChangeInjector"
# Record-metadata decoration into main record
+# This is not supported in OSS yet since we found decorate will require
re-build generic record which is expansive
gobblin.kafka.converter.recordMetadata.enable=true
# Writer configs
diff --git
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index f277b9e..cab3b79 100644
---
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -27,6 +28,7 @@ import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -144,18 +146,36 @@ public class Kafka09ConsumerClient<K, V> extends
AbstractBaseKafkaConsumerClient
public long getEarliestOffset(KafkaPartition partition) throws
KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new
TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
+ long previousPosition = this.consumer.position(topicPartition);
this.consumer.seekToBeginning(topicPartition);
+ long earliestOffset = this.consumer.position(topicPartition);
+ this.consumer.seek(topicPartition, previousPosition);
- return this.consumer.position(topicPartition);
+ return earliestOffset;
}
@Override
public long getLatestOffset(KafkaPartition partition) throws
KafkaOffsetRetrievalFailureException {
TopicPartition topicPartition = new
TopicPartition(partition.getTopicName(), partition.getId());
this.consumer.assign(Collections.singletonList(topicPartition));
+ long previousPosition = this.consumer.position(topicPartition);
this.consumer.seekToEnd(topicPartition);
+ long latestOffset = this.consumer.position(topicPartition);
+ this.consumer.seek(topicPartition, previousPosition);
- return this.consumer.position(topicPartition);
+ return latestOffset;
+ }
+
+ @Override
+ public void assignAndSeek(List<KafkaPartition> topicPartitions,
+ Map<KafkaPartition, LongWatermark> topicWatermarksMap) {
+ HashSet<KafkaPartition> topicPartitionSet = new HashSet(topicPartitions);
+ topicWatermarksMap.entrySet().stream().filter(entry ->
topicPartitionSet.contains(entry.getKey()))
+ .forEach(entry -> {
+ TopicPartition topicPartition = new
TopicPartition(entry.getKey().getTopicName(), entry.getKey().getId());
+ this.consumer.assign(Collections.singletonList(topicPartition));
+ this.consumer.seek(topicPartition, entry.getValue().getValue());
+ });
}
@Override
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest.java
new file mode 100644
index 0000000..00d5a20
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import org.apache.gobblin.writer.test.GobblinTestEventBusWriter;
+import org.apache.gobblin.writer.test.TestingEventBusAsserter;
+import org.testng.annotations.Test;
+
+
+public class KafkaStreamingLocalTest {
+ //disable the test as streaming task will never end unless manually kill it
+ @Test(enabled=false)
+ public void testStreamingLocally() {
+ String eventBusId = this.getClass().getName() + ".jobFileTest";
+
+ TestingEventBusAsserter asserter = new TestingEventBusAsserter(eventBusId);
+
+ EmbeddedGobblin embeddedGobblin =
+ new
EmbeddedGobblin("TestJob").jobFile(this.getClass().getClassLoader().getResource("kafkaHdfsStreaming.conf").getPath());
+
embeddedGobblin.setConfiguration(GobblinTestEventBusWriter.FULL_EVENTBUSID_KEY,
eventBusId);
+
+ try {
+ JobExecutionResult result = embeddedGobblin.run();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (TimeoutException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
b/gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHdfsStreaming.conf
similarity index 54%
copy from
gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
copy to
gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHdfsStreaming.conf
index 71c64dc..ea308fe 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
+++
b/gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHdfsStreaming.conf
@@ -14,7 +14,7 @@ fork.record.queue.capacity=1
# Streaming-source specific configurations
source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource
-gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor
+gobblin.source.kafka.extractorType=org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
kafka.workunit.size.estimator.type=CUSTOM
kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator
kafka.workunit.packer.type=CUSTOM
@@ -22,22 +22,24 @@
kafka.workunit.packer.customizedType=org.apache.gobblin.source.extractor.extract
extract.namespace=org.apache.gobblin.streaming.test
# Configure watermark storage for streaming, using FS-based for local testing
-streaming.watermarkStateStore.type=fs
+streaming.watermarkStateStore.type=fc
streaming.watermark.commitIntervalMillis=2000
# Converter configs
# Default Generic Record based pipeline
-recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter"
-
+# only use this config when kafka schemaRegistry is enabled
+#recordStreamProcessor.classes="org.apache.gobblin.converter.GenericRecordBasedKafkaSchemaChangeInjector"
+recordStreamProcessor.classes="org.apache.gobblin.converter.string.KafkaRecordToStringConverter,
org.apache.gobblin.converter.string.StringToBytesConverter"
# Record-metadata decoration into main record
+# This is not supported in OSS yet since we found decorate will require
re-build generic record which is expansive
gobblin.kafka.converter.recordMetadata.enable=true
# Writer configs
-writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
-writer.partitioner.class=org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
-writer.output.format=AVRO
-writer.partition.columns=header.time
-writer.partition.pattern=yyyy/MM/dd
+writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
+#writer.partitioner.class=org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
+writer.output.format=txt
+#writer.partition.columns=header.time
+#writer.partition.pattern=yyyy/MM/dd
writer.destination.type=HDFS
writer.staging.dir=/tmp/gobblin/streaming/writer-staging
writer.output.dir=/tmp/gobblin/streaming/writer-output
@@ -48,40 +50,44 @@ state.store.enabled=false
# Publisher config
data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
data.publisher.final.dir=/tmp/gobblin/kafka/publish
-flush.data.publisher.class=org.apache.gobblin.prototype.kafka.TimePartitionedStreamingDataPublisher
+flush.data.publisher.class=org.apache.gobblin.publisher.TimePartitionedStreamingDataPublisher
###Config that controls intervals between flushes (and consequently, data
publish)
stream.flush.interval.secs=60
### Following are Kafka Upstream related configurations
# Kafka source configurations
-topic.whitelist=
+topic.whitelist=testEvents
bootstrap.with.offset=EARLIEST
source.kafka.fetchTimeoutMillis=3000
kafka.consumer.maxPollRecords=100
#Kafka broker/schema registry configs
-kafka.schema.registry.url=
-kafka.schema.registry.class=
-kafka.schemaRegistry.class=
-kafka.schemaRegistry.url=
-kafka.brokers=
+#kafka.schema.registry.url=
+#kafka.schema.registry.class=org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry
+#kafka.schemaRegistry.class=org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry
+#kafka.schemaRegistry.url=
+kafka.brokers="localhost:9092"
#Kafka SSL configs
-security.protocol = SSL
-ssl.protocol = TLS
-ssl.trustmanager.algorithm =
-ssl.keymanager.algorithm =
-ssl.truststore.type =
-ssl.truststore.location =
-ssl.truststore.password =
-ssl.keystore.type =
-ssl.keystore.password =
-ssl.key.password =
-ssl.secure.random.implementation =
-ssl.keystore.location=<path to your kafka certs>
+#security.protocol = SSL
+#ssl.protocol = TLS
+#ssl.trustmanager.algorithm =
+#ssl.keymanager.algorithm =
+#ssl.truststore.type =
+#ssl.truststore.location =
+#ssl.truststore.password =
+#ssl.keystore.type =
+#ssl.keystore.password =
+#ssl.key.password =
+#ssl.secure.random.implementation =
+#ssl.keystore.location=<path to your kafka certs>
metrics.enabled=false
+# consumer client config
+gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
+source.kafka.value.deserializer="org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource$KafkaGsonDeserializer"
+
# Only Required for Local-testing
kafka.consumer.runtimeIngestionPropsEnabled=false
# Limit single mappers for ease of debugging
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index a29bd0f..b2045ac 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -323,8 +323,10 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
@Override
public S getSchema() {
try {
- Schema schema = (Schema)
this._schemaRegistry.get().getLatestSchemaByTopic(this.topicPartitions.get(0).getTopicName());
- return (S) schema;
+ if(this._schemaRegistry.isPresent()) {
+ return (S)(Schema)
this._schemaRegistry.get().getLatestSchemaByTopic(this.topicPartitions.get(0).getTopicName());
+ }
+ return (S) this.topicPartitions.iterator().next().getTopicName();
} catch (SchemaRegistryException e) {
e.printStackTrace();
}