Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 278b48d41 -> 90d8495ae


[GOBBLIN-296] Kafka json source and writer

Closes #2150 from zxcware/odsc


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/90d8495a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/90d8495a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/90d8495a

Branch: refs/heads/master
Commit: 90d8495ae5b057962a500eaaa78b52a02e07adbe
Parents: 278b48d
Author: zhchen <[email protected]>
Authored: Fri Oct 27 10:50:37 2017 -0700
Committer: Hung Tran <[email protected]>
Committed: Fri Oct 27 10:50:37 2017 -0700

----------------------------------------------------------------------
 .../kafka/writer/KafkaDataWriterBuilder.java    |  11 +-
 .../extract/kafka/KafkaGsonDeserializer.java    |  20 +--
 .../kafka/client/Kafka09ConsumerClient.java     |  34 ++---
 .../gobblin/kafka/writer/Kafka09DataWriter.java |  13 +-
 .../writer/Kafka09JsonObjectWriterBuilder.java  |  49 +++++++
 .../kafka/writer/KafkaDataWriterBuilder.java    |   3 +-
 .../extract/kafka/Kafka09JsonSource.java        |  90 ++++++++++++
 .../kafka/Kafka09JsonIntegrationTest.java       | 140 +++++++++++++++++++
 .../kafka/serialize/GsonDeserializerBase.java   |  44 ++++++
 .../kafka/serialize/GsonSerializerBase.java     |  46 ++++++
 .../writer/AbstractKafkaDataWriterBuilder.java  |  68 +++++++++
 .../writer/BaseKafkaDataWriterBuilder.java      |  50 +------
 .../gobblin/kafka/writer/KafkaWriterHelper.java |  22 +--
 .../util/MultiWorkUnitUnpackingIterator.java    |  54 +++++--
 14 files changed, 518 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
index 2d8aafc..7adbba1 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -17,29 +17,20 @@
 
 package org.apache.gobblin.kafka.writer;
 
-import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AsyncDataWriter;
-import org.apache.gobblin.writer.DataWriter;
-import org.apache.gobblin.writer.DataWriterBuilder;
-import org.apache.gobblin.writer.PartitionAwareDataWriterBuilder;
 
 /**
  * Builder that hands back a {@link Kafka08DataWriter}
  */
-public class KafkaDataWriterBuilder extends BaseKafkaDataWriterBuilder {
+public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
 
   @Override
   protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
     return new Kafka08DataWriter<>(props);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java
index e297d96..6556644 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaGsonDeserializer.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.source.extractor.extract.kafka;
 
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.Map;
 
 import org.apache.kafka.common.serialization.Deserializer;
 
@@ -27,30 +26,17 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.JsonElement;
 
+import org.apache.gobblin.kafka.serialize.GsonDeserializerBase;
+
 
 /**
  * Implementation of {@link Deserializer} that deserializes Kafka data into a 
{@link JsonElement} using the
  * {@link StandardCharsets#UTF_8} encoding.
  */
-public class KafkaGsonDeserializer implements Deserializer<JsonElement> {
+public class KafkaGsonDeserializer extends GsonDeserializerBase<JsonElement> 
implements Deserializer<JsonElement> {
 
   private static final Gson GSON = new Gson();
 
   @VisibleForTesting
   static final Charset CHARSET = StandardCharsets.UTF_8;
-
-  @Override
-  public void configure(Map<String, ?> configs, boolean isKey) {
-    // Do nothing
-  }
-
-  @Override
-  public JsonElement deserialize(String topic, byte[] data) {
-    return GSON.fromJson(new String(data, CHARSET), JsonElement.class);
-  }
-
-  @Override
-  public void close() {
-    // Do nothing
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
index 9b952ab..4943ac5 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/client/Kafka09ConsumerClient.java
@@ -22,12 +22,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nonnull;
-
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -39,17 +33,21 @@ import org.apache.kafka.common.TopicPartition;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.annotation.Nonnull;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
 
 import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.DatasetFilterUtils;
 
 
 /**
@@ -66,38 +64,40 @@ public class Kafka09ConsumerClient<K, V> extends 
AbstractBaseKafkaConsumerClient
   private static final String KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY = 
"session.timeout.ms";
   private static final String KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY = 
"key.deserializer";
   private static final String KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY = 
"value.deserializer";
+  private static final String KAFKA_09_CLIENT_GROUP_ID = "group.id";
 
   private static final String KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT = 
Boolean.toString(false);
   private static final String KAFKA_09_DEFAULT_KEY_DESERIALIZER =
       "org.apache.kafka.common.serialization.StringDeserializer";
+  private static final String KAFKA_09_DEFAULT_GROUP_ID = "kafka09";
 
   public static final String GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY = 
CONFIG_PREFIX
       + KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY;
   public static final String GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY = 
CONFIG_PREFIX
       + KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY;
 
+  private static final Config FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, 
KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT)
+          .put(KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY, 
KAFKA_09_DEFAULT_KEY_DESERIALIZER)
+          .put(KAFKA_09_CLIENT_GROUP_ID, KAFKA_09_DEFAULT_GROUP_ID)
+          .build());
+
   private final Consumer<K, V> consumer;
 
   private Kafka09ConsumerClient(Config config) {
     super(config);
     
Preconditions.checkArgument(config.hasPath(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
-        "Missing required property " + 
GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY);
-
-    Config scopedConfig = ConfigUtils.getConfigOrEmpty(config, 
AbstractBaseKafkaConsumerClient.CONFIG_PREFIX_NO_DOT);
+        "Missing required property " + 
GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY);
 
     Properties props = new Properties();
     props.put(KAFKA_09_CLIENT_BOOTSTRAP_SERVERS_KEY, 
Joiner.on(",").join(super.brokers));
-    props.put(KAFKA_09_CLIENT_ENABLE_AUTO_COMMIT_KEY, 
KAFKA_09_DEFAULT_ENABLE_AUTO_COMMIT);
     props.put(KAFKA_09_CLIENT_SESSION_TIMEOUT_KEY, super.socketTimeoutMillis);
-    props.put(KAFKA_09_CLIENT_KEY_DESERIALIZER_CLASS_KEY,
-        ConfigUtils.getString(config, 
GOBBLIN_CONFIG_KEY_DESERIALIZER_CLASS_KEY, KAFKA_09_DEFAULT_KEY_DESERIALIZER));
-    props.put(KAFKA_09_CLIENT_VALUE_DESERIALIZER_CLASS_KEY,
-        config.getString(GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY));
 
+    Config scopedConfig = 
config.getConfig(CONFIG_PREFIX_NO_DOT).withFallback(FALLBACK);
     props.putAll(ConfigUtils.configToProperties(scopedConfig));
 
     this.consumer = new KafkaConsumer<>(props);
-
   }
 
   public Kafka09ConsumerClient(Config config, Consumer<K, V> consumer) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
index 40a8a4c..2cb00e1 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09DataWriter.java
@@ -76,15 +76,14 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
   private final Producer<String, D> producer;
   private final String topic;
 
-  public static Producer getKafkaProducer(Properties props)
-  {
+  public static Producer getKafkaProducer(Properties props) {
     Object producerObject = KafkaWriterHelper.getKafkaProducer(props);
-    try
-    {
+    try {
       Producer producer = (Producer) producerObject;
       return producer;
     } catch (ClassCastException e) {
-      log.error("Failed to instantiate Kafka producer " + 
producerObject.getClass().getName() + " as instance of Producer.class", e);
+      log.error("Failed to instantiate Kafka producer " + 
producerObject.getClass().getName()
+          + " as instance of Producer.class", e);
       throw Throwables.propagate(e);
     }
   }
@@ -93,8 +92,7 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
     this(getKafkaProducer(props), ConfigFactory.parseProperties(props));
   }
 
-  public Kafka09DataWriter(Producer producer, Config config)
-  {
+  public Kafka09DataWriter(Producer producer, Config config) {
     this.topic = config.getString(KafkaWriterConfigurationKeys.KAFKA_TOPIC);
     this.producer = producer;
   }
@@ -106,7 +104,6 @@ public class Kafka09DataWriter<D> implements 
AsyncDataWriter<D> {
     this.producer.close();
   }
 
-
   @Override
   public Future<WriteResponse> write(final D record, final WriteCallback 
callback) {
     return new WriteResponseFuture<>(this.producer.send(new 
ProducerRecord<String, D>(topic, record), new Callback() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
new file mode 100644
index 0000000..c97c486
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/Kafka09JsonObjectWriterBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.util.Properties;
+
+import org.apache.gobblin.kafka.serialize.GsonSerializerBase;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.kafka.common.serialization.Serializer;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+
+/**
+ * A {@link org.apache.gobblin.writer.DataWriterBuilder} that builds a {@link 
org.apache.gobblin.writer.DataWriter} to
+ * write {@link JsonObject} to kafka
+ */
+public class Kafka09JsonObjectWriterBuilder extends 
AbstractKafkaDataWriterBuilder<JsonArray, JsonObject> {
+  private static final String VALUE_SERIALIZER_KEY =
+      KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX + 
KafkaWriterConfigurationKeys.VALUE_SERIALIZER_CONFIG;
+
+  @Override
+  protected AsyncDataWriter<JsonObject> getAsyncDataWriter(Properties props) {
+    props.setProperty(VALUE_SERIALIZER_KEY, 
KafkaGsonObjectSerializer.class.getName());
+    return new Kafka09DataWriter<>(props);
+  }
+
+  /**
+   * A specific {@link Serializer} that serializes {@link JsonObject} to byte 
array
+   */
+  public final static class KafkaGsonObjectSerializer extends 
GsonSerializerBase<JsonObject> implements Serializer<JsonObject> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
index 754f5a4..8869c43 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/kafka/writer/KafkaDataWriterBuilder.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.kafka.writer;
 
 import java.util.Properties;
 
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
 import org.apache.gobblin.writer.AsyncDataWriter;
@@ -27,7 +28,7 @@ import org.apache.gobblin.writer.AsyncDataWriter;
 /**
  * Builder that hands back a {@link Kafka09DataWriter}
  */
-public class KafkaDataWriterBuilder extends BaseKafkaDataWriterBuilder {
+public class KafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
   @Override
   protected AsyncDataWriter<GenericRecord> getAsyncDataWriter(Properties 
props) {
     return new Kafka09DataWriter<>(props);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java
 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java
new file mode 100644
index 0000000..772ade7
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/Kafka09JsonSource.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.kafka.serialize.GsonDeserializerBase;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+/**
+ * A {@link KafkaSource} that reads kafka record as {@link JsonObject}
+ */
+public class Kafka09JsonSource extends KafkaSource<JsonArray, JsonObject> {
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    if 
(!state.contains(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY))
 {
+      
state.setProp(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
+          KafkaGsonDeserializer.class.getName());
+    }
+    return super.getWorkunits(state);
+  }
+
+  @Override
+  public Extractor<JsonArray, JsonObject> getExtractor(WorkUnitState state)
+      throws IOException {
+    return new JsonExtractor(state);
+  }
+
+  static final class JsonExtractor extends KafkaExtractor<JsonArray, 
JsonObject> {
+    private static final String JSON_SCHEMA = "source.kafka.json.schema";
+    private static final JsonParser JSON_PARSER = new JsonParser();
+    private final JsonArray schema;
+
+    JsonExtractor(WorkUnitState state) {
+      super(state);
+      String schemaStr = state.getProp(JSON_SCHEMA);
+      if (StringUtils.isEmpty(schemaStr)) {
+        throw new RuntimeException("Missing configuration: " + JSON_SCHEMA);
+      }
+      this.schema = JSON_PARSER.parse(schemaStr).getAsJsonArray();
+    }
+
+    @Override
+    public JsonArray getSchema()
+        throws IOException {
+      return schema;
+    }
+
+    @Override
+    protected JsonObject decodeRecord(ByteArrayBasedKafkaRecord 
kafkaConsumerRecord)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * A specific kafka {@link Deserializer} that deserializes record as 
JasonObject
+   */
+  public static final class KafkaGsonDeserializer extends 
GsonDeserializerBase<JsonObject> implements Deserializer<JsonObject> {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java
new file mode 100644
index 0000000..5cbd4ea
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/kafka/Kafka09JsonIntegrationTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.List;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.kafka.writer.Kafka09JsonObjectWriterBuilder;
+import org.apache.gobblin.runtime.util.MultiWorkUnitUnpackingIterator;
+import org.apache.gobblin.source.extractor.DataRecordException;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.Destination;
+
+import static 
org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.KAFKA_PRODUCER_CONFIG_PREFIX;
+import static 
org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.KAFKA_TOPIC;
+
+
+/**
+ * An integration test for {@link Kafka09JsonSource} and {@link 
Kafka09JsonObjectWriterBuilder}. The test writes
+ * a json object to kafka with the writer and extracts it with the source
+ */
+@Slf4j
+public class Kafka09JsonIntegrationTest {
+  private final Gson gson;
+  private final KafkaTestBase kafkaTestHelper;
+
+  public Kafka09JsonIntegrationTest()
+      throws InterruptedException, RuntimeException {
+    kafkaTestHelper = new KafkaTestBase();
+    gson = new Gson();
+  }
+
+  @BeforeSuite
+  public void beforeSuite() {
+    log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
+    kafkaTestHelper.startServers();
+  }
+
+  @AfterSuite
+  public void afterSuite()
+      throws IOException {
+    try {
+      kafkaTestHelper.stopClients();
+    } finally {
+      kafkaTestHelper.stopServers();
+    }
+  }
+
+  private SourceState createSourceState(String topic) {
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS, "localhost:" + 
kafkaTestHelper.getKafkaServerPort());
+    state.setProp(KafkaSource.TOPIC_WHITELIST, topic);
+    state.setProp(KafkaSource.GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS,
+        Kafka09ConsumerClient.Factory.class.getName());
+    state.setProp(KafkaSource.BOOTSTRAP_WITH_OFFSET, "earliest");
+    return state;
+  }
+
+  @Test
+  public void testHappyPath()
+      throws IOException, DataRecordException {
+    String topic = "testKafka09JsonSource";
+    kafkaTestHelper.provisionTopic(topic);
+    SourceState state = createSourceState(topic);
+
+    // Produce a record
+    state.setProp(KAFKA_PRODUCER_CONFIG_PREFIX + "bootstrap.servers",
+        "localhost:" + kafkaTestHelper.getKafkaServerPort());
+    state.setProp(KAFKA_TOPIC, topic);
+    Destination destination = 
Destination.of(Destination.DestinationType.KAFKA, state);
+    Kafka09JsonObjectWriterBuilder writerBuilder = new 
Kafka09JsonObjectWriterBuilder();
+    writerBuilder.writeTo(destination);
+    DataWriter<JsonObject> writer = writerBuilder.build();
+
+    final String json = "{\"number\":27}";
+    JsonObject record = gson.fromJson(json, JsonObject.class);
+    writer.write(record);
+    writer.flush();
+    writer.close();
+
+    Kafka09JsonSource source = new Kafka09JsonSource();
+    List<WorkUnit> workUnitList = source.getWorkunits(state);
+    // Test the right value serializer is set
+    
Assert.assertEquals(state.getProp(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY),
+        Kafka09JsonSource.KafkaGsonDeserializer.class.getName());
+
+    // Test there is only one non-empty work unit
+    MultiWorkUnitUnpackingIterator iterator = new 
MultiWorkUnitUnpackingIterator(workUnitList.iterator());
+    Assert.assertTrue(iterator.hasNext());
+    WorkUnit workUnit = iterator.next();
+    
Assert.assertEquals(workUnit.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY), 
topic);
+    Assert.assertFalse(iterator.hasNext());
+
+    // Test extractor
+    WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+
+    final String jsonSchema =
+        
"[{\"columnName\":\"number\",\"comment\":\"\",\"isNullable\":\"false\",\"dataType\":{\"type\":\"int\"}}]";
+    workUnitState.setProp("source.kafka.json.schema", jsonSchema);
+
+    Extractor<JsonArray, JsonObject> extractor = 
source.getExtractor(workUnitState);
+    Assert.assertEquals(extractor.getSchema().toString(), jsonSchema);
+    Assert.assertEquals(extractor.readRecord(null).toString(), json);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java
new file mode 100644
index 0000000..58c59ee
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonDeserializerBase.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+
+
+/**
+ * Base kafka Gson deserializer, which deserializes a json string to a {@link 
JsonElement}
+ */
+public class GsonDeserializerBase<T extends JsonElement> {
+  private static final Gson GSON = new Gson();
+
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    // Do nothing
+  }
+
+  public T deserialize(String topic, byte[] data) {
+    return (T) GSON.fromJson(new String(data, StandardCharsets.UTF_8), 
JsonElement.class);
+  }
+
+  public void close() {
+    // Do nothing
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java
new file mode 100644
index 0000000..23c4535
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/serialize/GsonSerializerBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.serialize;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import com.google.gson.JsonElement;
+
+
+/**
+ * Base kafka GSON serializer, which serializes json data into string encoded 
with
+ * {@link StandardCharsets#UTF_8}
+ */
+public class GsonSerializerBase<T extends JsonElement> {
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    // Do nothing by default
+  }
+
+  public byte[] serialize(String topic, T data) {
+    if (data == null) {
+      return null;
+    } else {
+      return data.toString().getBytes(StandardCharsets.UTF_8);
+    }
+  }
+
+  public void close() {
+    // Nothing to close
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
new file mode 100644
index 0000000..a3db916
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/AbstractKafkaDataWriterBuilder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.kafka.writer;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.writer.AsyncDataWriter;
+import org.apache.gobblin.writer.AsyncWriterManager;
+import org.apache.gobblin.writer.DataWriter;
+import org.apache.gobblin.writer.DataWriterBuilder;
+
+
+/**
+ * Base kafka data writer builder. It builds an async kafka {@link DataWriter}
+ */
+public abstract class AbstractKafkaDataWriterBuilder<S, D> extends 
DataWriterBuilder<S, D> {
+
+  protected abstract AsyncDataWriter<D> getAsyncDataWriter(Properties props);
+
+  /**
+   * Build a {@link DataWriter}.
+   *
+   * @throws IOException if there is anything wrong building the writer
+   * @return the built {@link DataWriter}
+   */
+  @Override
+  public DataWriter<D> build()
+      throws IOException {
+    State state = this.destination.getProperties();
+    Properties taskProps = state.getProperties();
+    Config config = ConfigUtils.propertiesToConfig(taskProps);
+    long commitTimeoutMillis = ConfigUtils.getLong(config, 
KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_CONFIG,
+        KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_DEFAULT);
+    long commitStepWaitTimeMillis = ConfigUtils.getLong(config, 
KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_CONFIG,
+        KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_DEFAULT);
+    double failureAllowance = ConfigUtils.getDouble(config, 
KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
+        KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0;
+
+    return AsyncWriterManager.builder()
+        .config(config)
+        .commitTimeoutMillis(commitTimeoutMillis)
+        .commitStepWaitTimeInMillis(commitStepWaitTimeMillis)
+        .failureAllowanceRatio(failureAllowance)
+        .retriesEnabled(false)
+        .asyncDataWriter(getAsyncDataWriter(taskProps))
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java
index 8e5e9b0..c5b2f95 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/BaseKafkaDataWriterBuilder.java
@@ -17,57 +17,15 @@
 
 package org.apache.gobblin.kafka.writer;
 
-import java.io.IOException;
-import java.util.Properties;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.writer.AsyncWriterManager;
-import org.apache.gobblin.writer.AsyncDataWriter;
-import org.apache.gobblin.writer.DataWriter;
-import org.apache.gobblin.writer.DataWriterBuilder;
-
 
 /**
  * Base class for creating KafkaDataWriter builders.
+ *
+ * @deprecated Use {@link AbstractKafkaDataWriterBuilder}
  */
-
-public abstract class BaseKafkaDataWriterBuilder extends 
DataWriterBuilder<Schema, GenericRecord> {
-
-  protected abstract AsyncDataWriter<GenericRecord> 
getAsyncDataWriter(Properties props);
-
-  /**
-   * Build a {@link DataWriter}.
-   *
-   * @throws IOException if there is anything wrong building the writer
-   * @return the built {@link DataWriter}
-   */
-  @Override
-  public DataWriter<GenericRecord> build()
-      throws IOException {
-    State state = this.destination.getProperties();
-    Properties taskProps = state.getProperties();
-    Config config = ConfigUtils.propertiesToConfig(taskProps);
-    long commitTimeoutMillis = ConfigUtils.getLong(config, 
KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_CONFIG,
-        KafkaWriterConfigurationKeys.COMMIT_TIMEOUT_MILLIS_DEFAULT);
-    long commitStepWaitTimeMillis = ConfigUtils.getLong(config, 
KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_CONFIG,
-        KafkaWriterConfigurationKeys.COMMIT_STEP_WAIT_TIME_DEFAULT);
-    double failureAllowance = ConfigUtils.getDouble(config, 
KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_CONFIG,
-        KafkaWriterConfigurationKeys.FAILURE_ALLOWANCE_PCT_DEFAULT) / 100.0;
-
-    return AsyncWriterManager.builder()
-        .config(config)
-        .commitTimeoutMillis(commitTimeoutMillis)
-        .commitStepWaitTimeInMillis(commitStepWaitTimeMillis)
-        .failureAllowanceRatio(failureAllowance)
-        .retriesEnabled(false)
-        .asyncDataWriter(getAsyncDataWriter(taskProps))
-        .build();
-  }
-
+@Deprecated
+public abstract class BaseKafkaDataWriterBuilder extends 
AbstractKafkaDataWriterBuilder<Schema, GenericRecord> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
index 3f52645..28da311 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/writer/KafkaWriterHelper.java
@@ -40,8 +40,7 @@ import static 
org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys.CLIEN
 @Slf4j
 public class KafkaWriterHelper {
 
-  static Properties getProducerProperties(Properties props)
-  {
+  static Properties getProducerProperties(Properties props) {
     Properties producerProperties = stripPrefix(props, 
KAFKA_PRODUCER_CONFIG_PREFIX);
 
     // Provide default properties if not set from above
@@ -52,8 +51,7 @@ public class KafkaWriterHelper {
     return producerProperties;
   }
 
-  private static void setDefaultIfUnset(Properties props, String key, String 
value)
-  {
+  private static void setDefaultIfUnset(Properties props, String key, String 
value) {
     if (!props.containsKey(key)) {
       props.setProperty(key, value);
     }
@@ -62,22 +60,18 @@ public class KafkaWriterHelper {
   private static Properties stripPrefix(Properties props, String prefix) {
     Properties strippedProps = new Properties();
     int prefixLength = prefix.length();
-    for (String key: props.stringPropertyNames())
-    {
-      if (key.startsWith(prefix))
-      {
+    for (String key : props.stringPropertyNames()) {
+      if (key.startsWith(prefix)) {
         strippedProps.setProperty(key.substring(prefixLength), 
props.getProperty(key));
       }
     }
     return strippedProps;
   }
 
-  public static Object getKafkaProducer(Properties props)
-  {
+  public static Object getKafkaProducer(Properties props) {
     Config config = ConfigFactory.parseProperties(props);
-    String kafkaProducerClass = ConfigUtils
-        .getString(config, 
KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS,
-            KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS_DEFAULT);
+    String kafkaProducerClass = ConfigUtils.getString(config, 
KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS,
+        KafkaWriterConfigurationKeys.KAFKA_WRITER_PRODUCER_CLASS_DEFAULT);
     Properties producerProps = getProducerProperties(props);
     try {
       Class<?> producerClass = (Class<?>) Class.forName(kafkaProducerClass);
@@ -88,6 +82,4 @@ public class KafkaWriterHelper {
       throw Throwables.propagate(e);
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/90d8495a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java
index 38e4ce7..348fb28 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/MultiWorkUnitUnpackingIterator.java
@@ -31,29 +31,59 @@ import lombok.RequiredArgsConstructor;
 @RequiredArgsConstructor
 public class MultiWorkUnitUnpackingIterator implements Iterator<WorkUnit> {
   private final Iterator<WorkUnit> workUnits;
+
+  /** The iterator for {@link #nextWu} if it's a {@link MultiWorkUnit} */
   private Iterator<WorkUnit> currentIterator;
+  /** The work unit to be checked in {@link #next()} */
+  private WorkUnit nextWu;
+  /** A flag indicating if a new seek operation will be needed */
+  private boolean needSeek = true;
 
   @Override
   public boolean hasNext() {
-    return this.workUnits.hasNext() || (this.currentIterator != null && 
this.currentIterator.hasNext());
+    seekNext();
+    return nextWu != null;
   }
 
   @Override
   public WorkUnit next() {
+    // In case, the caller forgets to call hasNext()
+    seekNext();
+
+    WorkUnit wu = nextWu;
+    if (nextWu instanceof MultiWorkUnit) {
+      wu = this.currentIterator.next();
+    }
+    needSeek = true;
+    return wu;
+  }
+
+  /** Seek to the next available work unit, skipping all empty work units */
+  private void seekNext() {
+    if (!needSeek) {
+      return;
+    }
+
+    // First, iterate all
     if (this.currentIterator != null && this.currentIterator.hasNext()) {
-      WorkUnit next = this.currentIterator.next();
-      if (next instanceof MultiWorkUnit) {
-        throw new IllegalStateException("A MultiWorkUnit cannot contain other 
MultiWorkUnits.");
-      }
-      return next;
+      needSeek = false;
+      return;
     }
-    WorkUnit wu = this.workUnits.next();
-    if (wu instanceof MultiWorkUnit) {
-      this.currentIterator = ((MultiWorkUnit) wu).getWorkUnits().iterator();
-      return next();
-    } else {
-      return wu;
+
+    // Then, find the next available work unit
+    nextWu = null;
+    this.currentIterator = null;
+    while (nextWu == null && workUnits.hasNext()) {
+      nextWu = workUnits.next();
+      if (nextWu instanceof MultiWorkUnit) {
+        this.currentIterator = ((MultiWorkUnit) 
nextWu).getWorkUnits().iterator();
+        if (!this.currentIterator.hasNext()) {
+          nextWu = null;
+        }
+      }
     }
+
+    needSeek = false;
   }
 
   @Override

Reply via email to