This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 665e14e  [GOBBLIN-1521] Create local mode of streaming kafka job to 
help user quickly onboard (#3372)
665e14e is described below

commit 665e14efd1b08b604bb666b4a89136bfca2e09d8
Author: Zihan Li <[email protected]>
AuthorDate: Mon Oct 18 15:53:55 2021 -0700

    [GOBBLIN-1521] Create local mode of streaming kafka job to help user 
quickly onboard (#3372)
    
    * [GOBBLIN-1521] Create local mode of streaming kafka job to help user 
quickly onboard
    
    * remove intended change
    
    * update the document
    
    * address comments
    
    * address comments
---
 .../Run-Gobblin-Streaming-kafka-hdfs-Locally.md    | 31 +++++++++++
 .../gobblin_jobs/kafka-hdfs-streaming-avro.conf    |  5 +-
 .../kafka/client/Kafka09ConsumerClient.java        | 24 ++++++++-
 .../gobblin/kafka/KafkaStreamingLocalTest.java     | 51 ++++++++++++++++++
 .../src/test/resources/kafkaHdfsStreaming.conf}    | 62 ++++++++++++----------
 .../extract/kafka/KafkaStreamingExtractor.java     |  6 ++-
 6 files changed, 145 insertions(+), 34 deletions(-)

diff --git 
a/gobblin-docs/user-guide/Run-Gobblin-Streaming-kafka-hdfs-Locally.md 
b/gobblin-docs/user-guide/Run-Gobblin-Streaming-kafka-hdfs-Locally.md
new file mode 100644
index 0000000..0dd977b
--- /dev/null
+++ b/gobblin-docs/user-guide/Run-Gobblin-Streaming-kafka-hdfs-Locally.md
@@ -0,0 +1,31 @@
+# Table of Contents
+
+[TOC]
+
+# Introduction
+
+Gobblin supports streaming mode that allows continuous ingestion of data from 
Kafka to HDFS. The streaming mode has been deployed in production at LinkedIn 
as a Gobblin cluster that uses Yarn for container allocation and Helix for task 
coordination.
+
+Here, we describe how to set up a Kafka -> HDFS pipeline in local mode for 
users to easily start and test out a streaming ingestion pipeline. 
+
+
+# Setup local kafka cluster 
+
+Follow [kafka quick start](https://kafka.apache.org/quickstart) to set up your 
kafka cluster, and create test topic "testEvents"
+
+# Run EmbeddedGobblin to start the job
+
+We use the configuration: 
/gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHDFSStreaming.conf to 
execute the job.
+
+To run the job, in your intellij, you can run the test in 
/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest
+by removing the line '(enabled=false)'. In order to run the test in IDE, you 
may need to manually delete log4j-over-slf4j jar in IDE 
+
+Under your kafka dir, you can run following command to produce data into your 
kafka topic
+
+`bin/kafka-console-producer.sh --topic testEvents --bootstrap-server 
localhost:9092`
+
+The job will continually consume from testEvents and write out data as txt 
file onto your local fileSystem (/tmp/gobblin/kafka/publish). It will write put 
data every 60 seconds, and will never end until
+you manually kill it.
+
+If you want the job ingest data as avro/orc, you will need to have schema 
registry as schema source and change the job configuration to control the 
behavior, a sample configuration can be found 
[here](https://github.com/apache/gobblin/blob/master/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf)
+
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
index 71c64dc..631ba3c 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
@@ -14,7 +14,7 @@ fork.record.queue.capacity=1
 
 # Streaming-source specific configurations
 
source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource
-gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor
+gobblin.source.kafka.extractorType=org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
 kafka.workunit.size.estimator.type=CUSTOM
 
kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator
 kafka.workunit.packer.type=CUSTOM
@@ -27,9 +27,10 @@ streaming.watermark.commitIntervalMillis=2000
 
 # Converter configs
 # Default Generic Record based pipeline
-recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter"
+recordStreamProcessor.classes="org.apache.gobblin.converter.GenericRecordBasedKafkaSchemaChangeInjector"
 
 # Record-metadata decoration into main record
+# This is not supported in OSS yet since we found decorate will require 
re-build generic record which is expansive
 gobblin.kafka.converter.recordMetadata.enable=true
 
 # Writer configs
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index f277b9e..cab3b79 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -27,6 +28,7 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -144,18 +146,36 @@ public class Kafka09ConsumerClient<K, V> extends 
AbstractBaseKafkaConsumerClient
   public long getEarliestOffset(KafkaPartition partition) throws 
KafkaOffsetRetrievalFailureException {
     TopicPartition topicPartition = new 
TopicPartition(partition.getTopicName(), partition.getId());
     this.consumer.assign(Collections.singletonList(topicPartition));
+    long previousPosition = this.consumer.position(topicPartition);
     this.consumer.seekToBeginning(topicPartition);
+    long earliestOffset = this.consumer.position(topicPartition);
+    this.consumer.seek(topicPartition, previousPosition);
 
-    return this.consumer.position(topicPartition);
+    return earliestOffset;
   }
 
   @Override
   public long getLatestOffset(KafkaPartition partition) throws 
KafkaOffsetRetrievalFailureException {
     TopicPartition topicPartition = new 
TopicPartition(partition.getTopicName(), partition.getId());
     this.consumer.assign(Collections.singletonList(topicPartition));
+    long previousPosition = this.consumer.position(topicPartition);
     this.consumer.seekToEnd(topicPartition);
+    long latestOffset = this.consumer.position(topicPartition);
+    this.consumer.seek(topicPartition, previousPosition);
 
-    return this.consumer.position(topicPartition);
+    return latestOffset;
+  }
+
+  @Override
+  public void assignAndSeek(List<KafkaPartition> topicPartitions,
+      Map<KafkaPartition, LongWatermark> topicWatermarksMap) {
+    HashSet<KafkaPartition> topicPartitionSet = new HashSet(topicPartitions);
+    topicWatermarksMap.entrySet().stream().filter(entry -> 
topicPartitionSet.contains(entry.getKey()))
+        .forEach(entry -> {
+          TopicPartition topicPartition = new 
TopicPartition(entry.getKey().getTopicName(), entry.getKey().getId());
+          this.consumer.assign(Collections.singletonList(topicPartition));
+          this.consumer.seek(topicPartition, entry.getValue().getValue());
+        });
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest.java
new file mode 100644
index 0000000..00d5a20
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/KafkaStreamingLocalTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.apache.gobblin.runtime.api.JobExecutionResult;
+import org.apache.gobblin.runtime.embedded.EmbeddedGobblin;
+import org.apache.gobblin.writer.test.GobblinTestEventBusWriter;
+import org.apache.gobblin.writer.test.TestingEventBusAsserter;
+import org.testng.annotations.Test;
+
+
+public class KafkaStreamingLocalTest {
+  //disable the test as streaming task will never end unless manually kill it
+  @Test(enabled=false)
+  public void testStreamingLocally() {
+    String eventBusId = this.getClass().getName() + ".jobFileTest";
+
+    TestingEventBusAsserter asserter = new TestingEventBusAsserter(eventBusId);
+
+    EmbeddedGobblin embeddedGobblin =
+        new 
EmbeddedGobblin("TestJob").jobFile(this.getClass().getClassLoader().getResource("kafkaHdfsStreaming.conf").getPath());
+    
embeddedGobblin.setConfiguration(GobblinTestEventBusWriter.FULL_EVENTBUSID_KEY, 
eventBusId);
+
+    try {
+      JobExecutionResult result = embeddedGobblin.run();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } catch (TimeoutException e) {
+      e.printStackTrace();
+    } catch (ExecutionException e) {
+      e.printStackTrace();
+    }
+  }
+}
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
 b/gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHdfsStreaming.conf
similarity index 54%
copy from 
gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
copy to 
gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHdfsStreaming.conf
index 71c64dc..ea308fe 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/resources/kafkaHdfsStreaming.conf
@@ -14,7 +14,7 @@ fork.record.queue.capacity=1
 
 # Streaming-source specific configurations
 
source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource
-gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor
+gobblin.source.kafka.extractorType=org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor
 kafka.workunit.size.estimator.type=CUSTOM
 
kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator
 kafka.workunit.packer.type=CUSTOM
@@ -22,22 +22,24 @@ 
kafka.workunit.packer.customizedType=org.apache.gobblin.source.extractor.extract
 extract.namespace=org.apache.gobblin.streaming.test
 
 # Configure watermark storage for streaming, using FS-based for local testing
-streaming.watermarkStateStore.type=fs
+streaming.watermarkStateStore.type=fc
 streaming.watermark.commitIntervalMillis=2000
 
 # Converter configs
 # Default Generic Record based pipeline
-recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter"
-
+# only use this config when kafka schemaRegistry is enabled
+#recordStreamProcessor.classes="org.apache.gobblin.converter.GenericRecordBasedKafkaSchemaChangeInjector"
+recordStreamProcessor.classes="org.apache.gobblin.converter.string.KafkaRecordToStringConverter,
 org.apache.gobblin.converter.string.StringToBytesConverter"
 # Record-metadata decoration into main record
+# This is not supported in OSS yet since we found decorate will require 
re-build generic record which is expansive
 gobblin.kafka.converter.recordMetadata.enable=true
 
 # Writer configs
-writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
-writer.partitioner.class=org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
-writer.output.format=AVRO
-writer.partition.columns=header.time
-writer.partition.pattern=yyyy/MM/dd
+writer.builder.class=org.apache.gobblin.writer.SimpleDataWriterBuilder
+#writer.partitioner.class=org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
+writer.output.format=txt
+#writer.partition.columns=header.time
+#writer.partition.pattern=yyyy/MM/dd
 writer.destination.type=HDFS
 writer.staging.dir=/tmp/gobblin/streaming/writer-staging
 writer.output.dir=/tmp/gobblin/streaming/writer-output
@@ -48,40 +50,44 @@ state.store.enabled=false
 # Publisher config
 data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
 data.publisher.final.dir=/tmp/gobblin/kafka/publish
-flush.data.publisher.class=org.apache.gobblin.prototype.kafka.TimePartitionedStreamingDataPublisher
+flush.data.publisher.class=org.apache.gobblin.publisher.TimePartitionedStreamingDataPublisher
 ###Config that controls intervals between flushes (and consequently, data 
publish)
 stream.flush.interval.secs=60
 
 ### Following are Kafka Upstream related configurations
 # Kafka source configurations
-topic.whitelist=
+topic.whitelist=testEvents
 bootstrap.with.offset=EARLIEST
 source.kafka.fetchTimeoutMillis=3000
 kafka.consumer.maxPollRecords=100
 
 #Kafka broker/schema registry configs
-kafka.schema.registry.url=
-kafka.schema.registry.class=
-kafka.schemaRegistry.class=
-kafka.schemaRegistry.url=
-kafka.brokers=
+#kafka.schema.registry.url=
+#kafka.schema.registry.class=org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry
+#kafka.schemaRegistry.class=org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry
+#kafka.schemaRegistry.url=
+kafka.brokers="localhost:9092"
 
 #Kafka SSL configs
-security.protocol = SSL
-ssl.protocol = TLS
-ssl.trustmanager.algorithm =
-ssl.keymanager.algorithm =
-ssl.truststore.type =
-ssl.truststore.location =
-ssl.truststore.password =
-ssl.keystore.type =
-ssl.keystore.password =
-ssl.key.password =
-ssl.secure.random.implementation =
-ssl.keystore.location=<path to your kafka certs>
+#security.protocol = SSL
+#ssl.protocol = TLS
+#ssl.trustmanager.algorithm =
+#ssl.keymanager.algorithm =
+#ssl.truststore.type =
+#ssl.truststore.location =
+#ssl.truststore.password =
+#ssl.keystore.type =
+#ssl.keystore.password =
+#ssl.key.password =
+#ssl.secure.random.implementation =
+#ssl.keystore.location=<path to your kafka certs>
 
 metrics.enabled=false
 
+# consumer client config
+gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
+source.kafka.value.deserializer="org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource$KafkaGsonDeserializer"
+
 # Only Required for Local-testing
 kafka.consumer.runtimeIngestionPropsEnabled=false
 # Limit single mappers for ease of debugging
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index a29bd0f..b2045ac 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -323,8 +323,10 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
   @Override
   public S getSchema() {
     try {
-      Schema schema = (Schema) 
this._schemaRegistry.get().getLatestSchemaByTopic(this.topicPartitions.get(0).getTopicName());
-      return (S) schema;
+      if(this._schemaRegistry.isPresent()) {
+        return (S)(Schema) 
this._schemaRegistry.get().getLatestSchemaByTopic(this.topicPartitions.get(0).getTopicName());
+      }
+      return (S) this.topicPartitions.iterator().next().getTopicName();
     } catch (SchemaRegistryException e) {
       e.printStackTrace();
     }

Reply via email to