Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1447f2d3b -> b39bf8cab


[GOBBLIN-464] Enhance LoopingDatasetFinderSource to support global watermark 
and per-dataset watermark.

Closes #2336 from sv2000/loopingwm


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b39bf8ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b39bf8ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b39bf8ca

Branch: refs/heads/master
Commit: b39bf8cabcabe7864719f4121ccffa09d9f6c08a
Parents: 1447f2d
Author: suvasude <[email protected]>
Authored: Sat Apr 21 13:54:27 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Sat Apr 21 13:54:27 2018 -0700

----------------------------------------------------------------------
 .../CombinedWorkUnitAndDatasetState.java        |  34 ++
 ...mbinedWorkUnitAndDatasetStateFunctional.java |  33 ++
 .../configuration/ConfigurationKeys.java        |   1 +
 .../gobblin/configuration/SourceState.java      | 113 +++-
 .../source/LoopingDatasetFinderSource.java      | 139 +++--
 .../source/LoopingDatasetFinderSourceTest.java  | 546 ++++++++++++++++---
 .../gobblin/runtime/ZkDatasetStateStore.java    |   2 +-
 ...ombinedWorkUnitAndDatasetStateGenerator.java |  71 +++
 .../gobblin/runtime/FsDatasetStateStore.java    |  44 +-
 .../org/apache/gobblin/runtime/JobContext.java  |  33 +-
 .../org/apache/gobblin/runtime/JobState.java    |  11 +-
 .../gobblin/runtime/MysqlDatasetStateStore.java |   2 +-
 12 files changed, 867 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java
new file mode 100644
index 0000000..9b2743c
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.configuration;
+
+import java.util.List;
+import java.util.Map;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+
+/**
+ * A class that encapsulates {@link WorkUnitState} and DatasetStates.
+ */
+@AllArgsConstructor
+@Getter
+public class CombinedWorkUnitAndDatasetState {
+    private List<WorkUnitState> previousWorkUnitStates;
+    private Map<String, ? extends SourceState> previousDatasetStatesByUrns;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java
new file mode 100644
index 0000000..9543e00
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.configuration;
+
+/**
+ * A {@link FunctionalInterface} to return {@link 
CombinedWorkUnitAndDatasetState}.
+ */
+@FunctionalInterface
+public interface CombinedWorkUnitAndDatasetStateFunctional {
+  /**
+   *
+   * @param datasetUrn the dataset urn
+   * @return an instance {@link CombinedWorkUnitAndDatasetState}. If 
datasetUrn is null or empty, then return latest {@link WorkUnitState}s and 
DatasetStates of all datasetUrns in the state store; else return the previous 
{@link WorkUnitState} and the DatasetState of the specified datasetUrn.
+   * @throws Exception the exception
+   */
+  public CombinedWorkUnitAndDatasetState 
getCombinedWorkUnitAndDatasetState(String datasetUrn)
+      throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 8e2d225..a0eeca3 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -246,6 +246,7 @@ public class ConfigurationKeys {
    */
   // This property is used to specify the URN of a dataset a job or WorkUnit 
extracts data for
   public static final String DATASET_URN_KEY = "dataset.urn";
+  public static final String 
GLOBAL_WATERMARK_DATASET_URN="__globalDatasetWatermark";
   public static final String DEFAULT_DATASET_URN = "";
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
index fd772ea..9c08a5d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.configuration;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -64,19 +65,36 @@ public class SourceState extends State {
   private static final DateTimeFormatter DTF =
       
DateTimeFormat.forPattern("yyyyMMddHHmmss").withLocale(Locale.US).withZone(DateTimeZone.UTC);
 
+  private Map<String, SourceState> previousDatasetStatesByUrns;
+
+  private List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList();
+
   @Getter
-  private final Map<String, SourceState> previousDatasetStatesByUrns;
+  @Setter
+  private SharedResourcesBroker<GobblinScopeTypes> broker;
 
   @Getter
-  private final List<WorkUnitState> previousWorkUnitStates = 
Lists.newArrayList();
+  @Setter
+  private CombinedWorkUnitAndDatasetStateFunctional 
workUnitAndDatasetStateFunctional;
 
-  @Getter @Setter
-  private SharedResourcesBroker<GobblinScopeTypes> broker;
+  private boolean areWorkUnitStatesMaterialized;
 
   /**
    * Default constructor.
    */
   public SourceState() {
+    this.previousWorkUnitStates = new ArrayList<>();
+    this.previousDatasetStatesByUrns = ImmutableMap.of();
+  }
+
+  /**
+   * Constructor.
+   *
+   * @param properties job configuration properties
+   */
+  public SourceState(State properties) {
+    super.addAll(properties);
+    this.previousWorkUnitStates = new ArrayList<>();
     this.previousDatasetStatesByUrns = ImmutableMap.of();
   }
 
@@ -127,7 +145,8 @@ public class SourceState extends State {
 
   /**
    * Get the state (in the form of a {@link SourceState}) of a dataset 
identified by a dataset URN
-   * of the previous job run.
+   * of the previous job run. Useful when dataset state store is enabled and 
we want to load the latest
+   * state of a global watermark dataset.
    *
    * @param datasetUrn the dataset URN
    * @return the dataset state (in the form of a {@link SourceState}) of the 
previous job run
@@ -141,6 +160,54 @@ public class SourceState extends State {
   }
 
   /**
+   *
+   * @return a {@link Map} from dataset URNs (as being specified by {@link 
ConfigurationKeys#DATASET_URN_KEY}
+   * to the {@link SourceState} with the dataset URNs. The map is materialized 
upon invocation of the method
+   * by the source. Subsequent calls to this method will return the previously 
materialized map.
+   * <p>
+   *   {@link SourceState}s that do not have {@link 
ConfigurationKeys#DATASET_URN_KEY} set will be added
+   *   to the dataset state belonging to {@link 
ConfigurationKeys#DEFAULT_DATASET_URN}.
+   * </p>
+   *
+   * @return a {@link Map} from dataset URNs to the {@link SourceState} with 
the dataset URNs
+   */
+  public Map<String, SourceState> getPreviousDatasetStatesByUrns() {
+    if (this.workUnitAndDatasetStateFunctional != null) {
+      materializeWorkUnitAndDatasetStates(null);
+    }
+    return this.previousDatasetStatesByUrns;
+  }
+
+  /**
+   * Get a {@link List} of previous {@link WorkUnitState}s. The list is lazily 
materialized upon invocation of the
+   * method by the {@link org.apache.gobblin.source.Source}. Subsequent calls 
to this method will return the previously
+   * materialized map.
+   */
+  public List<WorkUnitState> getPreviousWorkUnitStates() {
+    if (this.workUnitAndDatasetStateFunctional != null) {
+      materializeWorkUnitAndDatasetStates(null);
+    }
+    return this.previousWorkUnitStates;
+  }
+
+  /**
+   * Get a {@link List} of previous {@link WorkUnitState}s for a given 
datasetUrn.
+   * @param datasetUrn
+   * @return {@link List} of {@link WorkUnitState}s.
+   */
+  public List<WorkUnitState> getPreviousWorkUnitStates(String datasetUrn) {
+    if (this.workUnitAndDatasetStateFunctional != null) {
+      try {
+        CombinedWorkUnitAndDatasetState state = 
this.workUnitAndDatasetStateFunctional.getCombinedWorkUnitAndDatasetState(datasetUrn);
+        return state.getPreviousWorkUnitStates();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return this.previousWorkUnitStates;
+  }
+
+  /**
    * Get a {@link Map} from dataset URNs (as being specified by {@link 
ConfigurationKeys#DATASET_URN_KEY}
    * to the {@link WorkUnitState} with the dataset URNs.
    *
@@ -153,12 +220,14 @@ public class SourceState extends State {
    */
   public Map<String, Iterable<WorkUnitState>> 
getPreviousWorkUnitStatesByDatasetUrns() {
     Map<String, Iterable<WorkUnitState>> previousWorkUnitStatesByDatasetUrns = 
Maps.newHashMap();
-
+    if (this.workUnitAndDatasetStateFunctional != null) {
+      materializeWorkUnitAndDatasetStates(null);
+    }
     for (WorkUnitState workUnitState : this.previousWorkUnitStates) {
       String datasetUrn =
           workUnitState.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
       if (!previousWorkUnitStatesByDatasetUrns.containsKey(datasetUrn)) {
-        previousWorkUnitStatesByDatasetUrns.put(datasetUrn, 
Lists.<WorkUnitState> newArrayList());
+        previousWorkUnitStatesByDatasetUrns.put(datasetUrn, 
Lists.newArrayList());
       }
       ((List<WorkUnitState>) 
previousWorkUnitStatesByDatasetUrns.get(datasetUrn)).add(workUnitState);
     }
@@ -167,6 +236,25 @@ public class SourceState extends State {
   }
 
   /**
+   * A thread-safe method for materializing previous {@link WorkUnitState}s 
and DatasetStates.
+   * @param datasetUrn
+   */
+  private synchronized void materializeWorkUnitAndDatasetStates(String 
datasetUrn) {
+    if (!this.areWorkUnitStatesMaterialized) {
+      try {
+        CombinedWorkUnitAndDatasetState workUnitAndDatasetState =
+            
this.workUnitAndDatasetStateFunctional.getCombinedWorkUnitAndDatasetState(datasetUrn);
+        this.previousWorkUnitStates = 
workUnitAndDatasetState.getPreviousWorkUnitStates();
+        this.previousDatasetStatesByUrns =
+            (Map<String, SourceState>) 
workUnitAndDatasetState.getPreviousDatasetStatesByUrns();
+        this.areWorkUnitStatesMaterialized = true;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
    * Create a new properly populated {@link Extract} instance.
    *
    * <p>
@@ -179,7 +267,7 @@ public class SourceState extends State {
    * @return a new unique {@link Extract} instance
    *
    * @Deprecated Use {@link 
org.apache.gobblin.source.extractor.extract.AbstractSource#createExtract(
-   * org.apache.gobblin.source.workunit.Extract.TableType, String, String)}
+   *org.apache.gobblin.source.workunit.Extract.TableType, String, String)}
    */
   @Deprecated
   public synchronized Extract createExtract(Extract.TableType type, String 
namespace, String table) {
@@ -211,7 +299,8 @@ public class SourceState extends State {
   }
 
   @Override
-  public void write(DataOutput out) throws IOException {
+  public void write(DataOutput out)
+      throws IOException {
     out.writeInt(this.previousWorkUnitStates.size());
     for (WorkUnitState state : this.previousWorkUnitStates) {
       state.write(out);
@@ -220,7 +309,8 @@ public class SourceState extends State {
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
+  public void readFields(DataInput in)
+      throws IOException {
     int size = in.readInt();
     for (int i = 0; i < size; i++) {
       WorkUnitState workUnitState = new WorkUnitState();
@@ -261,7 +351,8 @@ public class SourceState extends State {
     }
 
     @Override
-    public void readFields(DataInput in) throws IOException {
+    public void readFields(DataInput in)
+        throws IOException {
       throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
index 4ca0dcb..355f463 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.data.management.source;
 
 import java.io.IOException;
@@ -25,6 +24,15 @@ import java.util.Spliterator;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.PeekingIterator;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.Dataset;
@@ -36,16 +44,11 @@ import org.apache.gobblin.runtime.task.NoopTask;
 import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
-
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import com.google.common.collect.PeekingIterator;
+import org.apache.gobblin.util.ConfigUtils;
 
 import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 
-
 /**
  * A source that processes datasets generated by a {@link 
org.apache.gobblin.dataset.DatasetsFinder}, processing a few of
  * them each run, and continuing from where it left off in the next run. When 
it is done processing all the datasets, it
@@ -55,16 +58,19 @@ import lombok.extern.slf4j.Slf4j;
  */
 @Slf4j
 public abstract class LoopingDatasetFinderSource<S, D> extends 
DatasetFinderSource<S, D> {
-
-  public static final String MAX_WORK_UNITS_PER_RUN_KEY = 
"gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun";
+  public static final String MAX_WORK_UNITS_PER_RUN_KEY =
+      "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun";
   public static final int MAX_WORK_UNITS_PER_RUN = 10;
+  public static final String DATASET_PARTITION_DELIMITER = "@";
 
-  private static final String DATASET_URN = 
"gobblin.source.loopingDatasetFinderSource.datasetUrn";
-  private static final String PARTITION_URN = 
"gobblin.source.loopingDatasetFinderSource.partitionUrn";
-  private static final String WORK_UNIT_ORDINAL = 
"gobblin.source.loopingDatasetFinderSource.workUnitOrdinal";
+  protected static final String DATASET_URN = 
"gobblin.source.loopingDatasetFinderSource.datasetUrn";
+  protected static final String PARTITION_URN = 
"gobblin.source.loopingDatasetFinderSource.partitionUrn";
   protected static final String END_OF_DATASETS_KEY = 
"gobblin.source.loopingDatasetFinderSource.endOfDatasets";
+  protected static final String GLOBAL_WATERMARK_DATASET_KEY =
+      "gobblin.source.loopingDatasetFinderSource.globalWatermarkDataset";
 
   private final URNLexicographicalComparator lexicographicalComparator = new 
URNLexicographicalComparator();
+  protected boolean isDatasetStateStoreEnabled;
 
   /**
    * @param drilldownIntoPartitions if set to true, will process each 
partition of a {@link PartitionableDataset} as a
@@ -81,19 +87,26 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
 
   @Override
   public WorkUnitStream getWorkunitStream(SourceState state) {
+    return this.getWorkunitStream(state,false);
+  }
+
+  public WorkUnitStream getWorkunitStream(SourceState state, boolean 
isDatasetStateStoreEnabled) {
+    this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled;
     try {
       int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, 
MAX_WORK_UNITS_PER_RUN);
-
-      List<WorkUnitState> previousWorkUnitStates = 
state.getPreviousWorkUnitStates();
-      Optional<WorkUnitState> maxWorkUnit;
-      try {
-        maxWorkUnit = previousWorkUnitStates.stream().reduce((wu1, wu2) -> {
-          int wu1Ordinal = wu1.getPropAsInt(WORK_UNIT_ORDINAL);
-          int wu2Ordinal = wu2.getPropAsInt(WORK_UNIT_ORDINAL);
-          return wu1Ordinal > wu2Ordinal ? wu1 : wu2;
-        });
-      } catch (NumberFormatException nfe) {
-        throw new RuntimeException("Work units in state store are corrupted! 
Missing or malformed " + WORK_UNIT_ORDINAL);
+      Preconditions.checkArgument(maxWorkUnits > 0, "Max work units must be 
greater than 0!");
+      Config config = ConfigUtils.propertiesToConfig(state.getProperties());
+
+      List<WorkUnitState> previousWorkUnitStates = 
(this.isDatasetStateStoreEnabled) ? state
+          
.getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN)
+          : state.getPreviousWorkUnitStates();
+
+      Optional<WorkUnitState> maxWorkUnit = Optional.empty();
+      for (WorkUnitState workUnitState : previousWorkUnitStates) {
+        if (workUnitState.getPropAsBoolean(GLOBAL_WATERMARK_DATASET_KEY, 
false)) {
+          maxWorkUnit = Optional.of(workUnitState);
+          break;
+        }
       }
 
       String previousDatasetUrnWatermark = null;
@@ -105,11 +118,13 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
 
       IterableDatasetFinder datasetsFinder = createDatasetsFinder(state);
 
-      Stream<Dataset> datasetStream = 
datasetsFinder.getDatasetsStream(Spliterator.SORTED, 
this.lexicographicalComparator);
+      Stream<Dataset> datasetStream =
+          datasetsFinder.getDatasetsStream(Spliterator.SORTED, 
this.lexicographicalComparator);
       datasetStream = sortStreamLexicographically(datasetStream);
 
-      return new BasicWorkUnitStream.Builder(new 
DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark,
-          previousPartitionUrnWatermark, 
maxWorkUnits)).setFiniteStream(true).build();
+      return new BasicWorkUnitStream.Builder(
+          new DeepIterator(datasetStream.iterator(), 
previousDatasetUrnWatermark, previousPartitionUrnWatermark,
+              maxWorkUnits, config)).setFiniteStream(true).build();
     } catch (IOException ioe) {
       throw new RuntimeException(ioe);
     }
@@ -125,13 +140,16 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
 
     private Iterator<PartitionableDataset.DatasetPartition> 
currentPartitionIterator;
     private int generatedWorkUnits = 0;
+    private Dataset previousDataset;
+    private PartitionableDataset.DatasetPartition previousPartition;
 
     public DeepIterator(Iterator<Dataset> baseIterator, String 
previousDatasetUrnWatermark,
-        String previousPartitionUrnWatermark, int maxWorkUnits) {
+        String previousPartitionUrnWatermark, int maxWorkUnits, Config config)
+        throws IOException {
       this.maxWorkUnits = maxWorkUnits;
       this.baseIterator = baseIterator;
-
-      Dataset equalDataset = 
advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), 
previousDatasetUrnWatermark);
+      Dataset equalDataset =
+          advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), 
previousDatasetUrnWatermark);
 
       if (drilldownIntoPartitions && equalDataset != null && equalDataset 
instanceof PartitionableDataset) {
         this.currentPartitionIterator = 
getPartitionIterator((PartitionableDataset) equalDataset);
@@ -145,7 +163,8 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
      * Advance an iterator until the next value is larger than the reference.
      * @return the last value polled if it is equal to reference, or null 
otherwise.
      */
-    @Nullable  private <T extends URNIdentified> T 
advanceUntilLargerThan(PeekingIterator<T> it, String reference) {
+    @Nullable
+    private <T extends URNIdentified> T 
advanceUntilLargerThan(PeekingIterator<T> it, String reference) {
       if (reference == null) {
         return null;
       }
@@ -160,7 +179,8 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
     private Iterator<PartitionableDataset.DatasetPartition> 
getPartitionIterator(PartitionableDataset dataset) {
       try {
         return this.currentPartitionIterator = sortStreamLexicographically(
-            dataset.getPartitions(Spliterator.SORTED, 
LoopingDatasetFinderSource.this.lexicographicalComparator)).iterator();
+            dataset.getPartitions(Spliterator.SORTED, 
LoopingDatasetFinderSource.this.lexicographicalComparator))
+            .iterator();
       } catch (IOException ioe) {
         log.error("Failed to get partitions for dataset " + dataset.getUrn());
         return Iterators.emptyIterator();
@@ -169,7 +189,15 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
 
     @Override
     protected WorkUnit computeNext() {
-      if (this.generatedWorkUnits >= this.maxWorkUnits) {
+      if (this.generatedWorkUnits == this.maxWorkUnits) {
+        /**
+         * Add a special noop workunit to the end of the stream. This workunit 
contains the Dataset/Partition
+         * URN of the "last" dataset/partition (in lexicographic order). This 
is useful to
+         * efficiently determine the next dataset/partition to process in the 
subsequent run.
+         */
+        this.generatedWorkUnits++;
+        return generateNoopWorkUnit();
+      } else if (this.generatedWorkUnits > this.maxWorkUnits) {
         return endOfData();
       }
 
@@ -177,8 +205,11 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
         if (this.currentPartitionIterator != null && 
this.currentPartitionIterator.hasNext()) {
           PartitionableDataset.DatasetPartition partition = 
this.currentPartitionIterator.next();
           WorkUnit workUnit = workUnitForDatasetPartition(partition);
-          addDatasetInfoToWorkUnit(workUnit, partition.getDataset(), 
this.generatedWorkUnits++);
+          addDatasetInfoToWorkUnit(workUnit, partition.getDataset());
           addPartitionInfoToWorkUnit(workUnit, partition);
+          this.previousDataset = partition.getDataset();
+          this.previousPartition = partition;
+          this.generatedWorkUnits++;
           return workUnit;
         }
 
@@ -187,27 +218,44 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
           this.currentPartitionIterator = 
getPartitionIterator((PartitionableDataset) dataset);
         } else {
           WorkUnit workUnit = workUnitForDataset(dataset);
-          addDatasetInfoToWorkUnit(workUnit, dataset, 
this.generatedWorkUnits++);
+          addDatasetInfoToWorkUnit(workUnit, dataset);
+          this.previousDataset = dataset;
+          this.generatedWorkUnits++;
           return workUnit;
         }
       }
-
-      WorkUnit workUnit = NoopTask.noopWorkunit();
-      workUnit.setProp(WORK_UNIT_ORDINAL, this.generatedWorkUnits);
-
+      WorkUnit workUnit = generateNoopWorkUnit();
       this.generatedWorkUnits = Integer.MAX_VALUE;
-
       workUnit.setProp(END_OF_DATASETS_KEY, true);
       return workUnit;
     }
 
-    private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset, 
int workUnitOrdinal) {
-      workUnit.setProp(DATASET_URN, dataset.getUrn());
-      workUnit.setProp(WORK_UNIT_ORDINAL, workUnitOrdinal);
+    private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) {
+      if (isDatasetStateStoreEnabled) {
+        workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
+      }
     }
 
     private void addPartitionInfoToWorkUnit(WorkUnit workUnit, 
PartitionableDataset.DatasetPartition partition) {
-      workUnit.setProp(PARTITION_URN, partition.getUrn());
+      if (isDatasetStateStoreEnabled) {
+        workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY,
+            
Joiner.on(DATASET_PARTITION_DELIMITER).join(partition.getDataset().getUrn(), 
partition.getUrn()));
+      }
+    }
+
+    private WorkUnit generateNoopWorkUnit() {
+      WorkUnit workUnit = NoopTask.noopWorkunit();
+      workUnit.setProp(GLOBAL_WATERMARK_DATASET_KEY, true);
+      if (previousDataset != null) {
+        workUnit.setProp(DATASET_URN, previousDataset.getUrn());
+      }
+      if (drilldownIntoPartitions && this.previousPartition != null) {
+        workUnit.setProp(PARTITION_URN, previousPartition.getUrn());
+      }
+      if (isDatasetStateStoreEnabled) {
+        workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+      }
+      return workUnit;
     }
   }
 
@@ -216,11 +264,10 @@ public abstract class LoopingDatasetFinderSource<S, D> 
extends DatasetFinderSour
    */
   private <T extends URNIdentified> Stream<T> 
sortStreamLexicographically(Stream<T> inputStream) {
     Spliterator<T> spliterator = inputStream.spliterator();
-    if (spliterator.hasCharacteristics(Spliterator.SORTED) &&
-        spliterator.getComparator().equals(this.lexicographicalComparator)) {
+    if (spliterator.hasCharacteristics(Spliterator.SORTED) && 
spliterator.getComparator()
+        .equals(this.lexicographicalComparator)) {
       return StreamSupport.stream(spliterator, false);
     }
     return StreamSupport.stream(spliterator, 
false).sorted(this.lexicographicalComparator);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
index 76fe172..41745f1 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
@@ -14,14 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gobblin.data.management.source;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.source.extractor.WatermarkInterval;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.Dataset;
@@ -31,27 +45,48 @@ import 
org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
 import org.apache.gobblin.dataset.test.SimpleDatasetPartitionForTesting;
 import org.apache.gobblin.dataset.test.SimplePartitionableDatasetForTesting;
 import org.apache.gobblin.dataset.test.StaticDatasetsFinderForTesting;
+import org.apache.gobblin.runtime.FsDatasetStateStore;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnitStream;
-import org.mockito.Mockito;
-import org.testng.Assert;
-import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class LoopingDatasetFinderSourceTest {
+  private static final String TEST_JOB_NAME_1 = "TestJob1";
+  private static final String TEST_JOB_NAME_2 = "TestJob2";
+  private static final String TEST_JOB_ID = "TestJob11";
+  private static final String TEST_TASK_ID_PREFIX = "TestTask-";
+  private static final String TEST_STATE_STORE_ROOT_DIR = 
"/tmp/LoopingSourceTest";
+
+  private FsDatasetStateStore fsDatasetStateStore;
+  private long startTime = System.currentTimeMillis();
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    this.fsDatasetStateStore = new 
FsDatasetStateStore(ConfigurationKeys.LOCAL_FS_URI, TEST_STATE_STORE_ROOT_DIR);
+
+    // clear data that may have been left behind by a prior test run
+    this.fsDatasetStateStore.delete(TEST_JOB_NAME_1);
+    this.fsDatasetStateStore.delete(TEST_JOB_NAME_2);
+  }
 
   @Test
   public void testNonDrilldown() {
     Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
-    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", 
Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2")));
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2",
+        Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2")));
     Dataset dataset3 = new SimpleDatasetForTesting("dataset3");
     Dataset dataset4 = new SimpleDatasetForTesting("dataset4");
     Dataset dataset5 = new SimpleDatasetForTesting("dataset5");
 
-    IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(
-        Lists.newArrayList(dataset5, dataset4, dataset3, dataset2, dataset1));
+    IterableDatasetFinder finder =
+        new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset5, 
dataset4, dataset3, dataset2, dataset1));
 
     MySource mySource = new MySource(false, finder);
 
@@ -61,13 +96,8 @@ public class LoopingDatasetFinderSourceTest {
     WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState);
     List<WorkUnit> workUnits = 
Lists.newArrayList(workUnitStream.getWorkUnits());
 
-    Assert.assertEquals(workUnits.size(), 3);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset1");
-    
Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset3");
-    
Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.size(), 4);
+    verifyWorkUnitState(workUnits, "dataset3", null, false, false);
 
     // Second run should continue where it left off
     List<WorkUnitState> workUnitStates = 
workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
@@ -78,11 +108,7 @@ public class LoopingDatasetFinderSourceTest {
     workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
 
     Assert.assertEquals(workUnits.size(), 3);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset4");
-    
Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset5");
-    
Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertTrue(workUnits.get(2).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    verifyWorkUnitState(workUnits, "dataset5", null, true, false);
 
     // Loop around
     workUnitStates = 
workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
@@ -91,28 +117,22 @@ public class LoopingDatasetFinderSourceTest {
     workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
     workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
 
-    Assert.assertEquals(workUnits.size(), 3);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset1");
-    
Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset3");
-    
Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN));
+    Assert.assertEquals(workUnits.size(), 4);
+    verifyWorkUnitState(workUnits, "dataset3", null, false, false);
   }
 
   @Test
   public void testDrilldown() {
     // Create three datasets, two of them partitioned
     Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
-    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2",
-        Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"),
-            new SimpleDatasetPartitionForTesting("p2"), new 
SimpleDatasetPartitionForTesting("p3")));
-    Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3",
-        Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"),
-            new SimpleDatasetPartitionForTesting("p2"), new 
SimpleDatasetPartitionForTesting("p3")));
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", 
Lists
+        .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2"),
+            new SimpleDatasetPartitionForTesting("p3")));
+    Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3", 
Lists
+        .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2"),
+            new SimpleDatasetPartitionForTesting("p3")));
 
-    IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(
-        Lists.newArrayList(dataset3, dataset2, dataset1));
+    IterableDatasetFinder finder = new 
StaticDatasetsFinderForTesting(Lists.newArrayList(dataset3, dataset2, 
dataset1));
 
     MySource mySource = new MySource(true, finder);
 
@@ -124,13 +144,8 @@ public class LoopingDatasetFinderSourceTest {
     WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState);
     List<WorkUnit> workUnits = 
Lists.newArrayList(workUnitStream.getWorkUnits());
 
-    Assert.assertEquals(workUnits.size(), 3);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset1");
-    
Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p1");
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p2");
+    Assert.assertEquals(workUnits.size(), 4);
+    verifyWorkUnitState(workUnits, "dataset2", "p2", false, false);
 
     // Second run should continue where it left off
     List<WorkUnitState> workUnitStates = 
workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
@@ -140,13 +155,8 @@ public class LoopingDatasetFinderSourceTest {
     workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
     workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
 
-    Assert.assertEquals(workUnits.size(), 3);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p3");
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset3");
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p1");
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset3");
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p2");
+    Assert.assertEquals(workUnits.size(), 4);
+    verifyWorkUnitState(workUnits, "dataset3", "p2", false, false);
 
     // third run, continue from where it left off
     workUnitStates = 
workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
@@ -156,9 +166,7 @@ public class LoopingDatasetFinderSourceTest {
     workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
 
     Assert.assertEquals(workUnits.size(), 2);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset3");
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p3");
-    
Assert.assertTrue(workUnits.get(1).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    verifyWorkUnitState(workUnits, "dataset3", "p3", true, false);
 
     // fourth run, finished all work units, loop around
     workUnitStates = 
workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList());
@@ -167,40 +175,438 @@ public class LoopingDatasetFinderSourceTest {
     workUnitStream = mySource.getWorkunitStream(sourceStateSpy);
     workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
 
+    Assert.assertEquals(workUnits.size(), 4);
+    verifyWorkUnitState(workUnits, "dataset2", "p2", false, false);
+  }
+
+  @Test
+  public void testNonDrilldownDatasetState()
+      throws IOException {
+    Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2",
+        Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2")));
+    Dataset dataset3 = new SimpleDatasetForTesting("dataset3");
+    Dataset dataset4 = new SimpleDatasetForTesting("dataset4");
+    Dataset dataset5 = new SimpleDatasetForTesting("dataset5");
+
+    IterableDatasetFinder finder =
+        new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset5, 
dataset4, dataset3, dataset2, dataset1));
+
+    MySource mySource = new MySource(false, finder, fsDatasetStateStore, 
TEST_JOB_NAME_1);
+
+    SourceState sourceState = new SourceState();
+    sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 
3);
+    sourceState.setProp(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
TEST_STATE_STORE_ROOT_DIR);
+    sourceState.setProp(ConfigurationKeys.JOB_NAME_KEY, TEST_JOB_NAME_1);
+    WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState, 
true);
+    List<WorkUnit> workUnits = 
Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 4);
+    List<LongWatermark> watermarks1 = new ArrayList<>();
+    List<Dataset> datasets1 = new ArrayList<>();
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset1");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets1.add(dataset1);
+
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2");
+    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
+    datasets1.add(dataset2);
+
+    
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3");
+    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
+    datasets1.add(dataset3);
+
+    
Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY),
+        ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+
+    Dataset globalWmDataset = new 
SimpleDatasetForTesting(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    datasets1.add(globalWmDataset);
+
+    verifyWorkUnitState(workUnits,"dataset3", null, false, true);
+    persistDatasetState(datasets1, watermarks1, TEST_JOB_NAME_1);
+    testDatasetStates(datasets1, watermarks1, TEST_JOB_NAME_1);
+
+    // Second run should continue where it left off
+    List<LongWatermark> watermarks2 = new ArrayList<>();
+    List<Dataset> datasets2 = new ArrayList<>();
+
+    int workUnitSize = workUnits.size();
+    List<WorkUnitState> workUnitStates =
+        workUnits.subList(workUnitSize - 1, 
workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList());
+    SourceState sourceStateSpy = Mockito.spy(sourceState);
+    
Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
     Assert.assertEquals(workUnits.size(), 3);
-    
Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset1");
-    
Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN));
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p1");
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN),
 "dataset2");
-    
Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN),
 "p2");
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset4");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets2.add(dataset4);
+
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset5");
+    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
+    datasets2.add(dataset5);
+
+    
Assert.assertTrue(workUnits.get(2).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    datasets2.add(globalWmDataset);
+
+    verifyWorkUnitState(workUnits,"dataset5",null,true, true);
+    persistDatasetState(datasets2, watermarks2, TEST_JOB_NAME_1);
+    testDatasetStates(datasets2, watermarks2, TEST_JOB_NAME_1);
+
+    // Loop around
+    List<LongWatermark> watermarks3 = new ArrayList<>();
+    List<Dataset> datasets3 = new ArrayList<>();
+
+    workUnitSize = workUnits.size();
+    workUnitStates = workUnits.subList(workUnitSize - 1, 
workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList());
+    
Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 4);
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset1");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 watermarks1.get(0).getValue());
+    
watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets3.add(dataset1);
+
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2");
+    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 watermarks1.get(1).getValue());
+    
watermarks3.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
+    datasets3.add(dataset2);
+
+    
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3");
+    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 watermarks1.get(2).getValue());
+    
watermarks3.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
+    datasets3.add(dataset3);
+
+    
Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY),
 ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    datasets3.add(globalWmDataset);
+
+    verifyWorkUnitState(workUnits,"dataset3",null,false, true);
+    persistDatasetState(datasets3, watermarks3, TEST_JOB_NAME_1);
+    testDatasetStates(datasets3, watermarks3, TEST_JOB_NAME_1);
+  }
+
+  @Test
+  public void testDrilldownDatasetState()
+      throws IOException {
+    // Create three datasets, two of them partitioned
+    Dataset dataset1 = new SimpleDatasetForTesting("dataset1");
+    Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", 
Lists
+        .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2"),
+            new SimpleDatasetPartitionForTesting("p3")));
+    Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3", 
Lists
+        .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new 
SimpleDatasetPartitionForTesting("p2"),
+            new SimpleDatasetPartitionForTesting("p3")));
+
+    IterableDatasetFinder finder = new 
StaticDatasetsFinderForTesting(Lists.newArrayList(dataset3, dataset2, 
dataset1));
+
+    MySource mySource = new MySource(true, finder, fsDatasetStateStore, 
TEST_JOB_NAME_2);
+
+    // Limit to 3 wunits per run
+    SourceState sourceState = new SourceState();
+    sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 
3);
+    sourceState.setProp(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
TEST_STATE_STORE_ROOT_DIR);
+    sourceState.setProp(ConfigurationKeys.JOB_NAME_KEY, TEST_JOB_NAME_2);
+
+    // first run, get three first work units
+    WorkUnitStream workUnitStream = 
mySource.getWorkunitStream(sourceState,true);
+    List<WorkUnit> workUnits = 
Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    List<LongWatermark> watermarks1 = new ArrayList<>();
+    List<Dataset> datasets1 = new ArrayList<>();
+
+    Assert.assertEquals(workUnits.size(), 4);
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset1");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets1.add(dataset1);
+
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p1");
+    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
+    datasets1.add(new SimpleDatasetForTesting("dataset2@p1"));
+
+    
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p2");
+    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
+    datasets1.add(new SimpleDatasetForTesting("dataset2@p2"));
+
+    
Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY),
 ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    
Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.DATASET_URN),
 "dataset2");
+    
Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.PARTITION_URN),
 "p2");
+    Dataset globalWmDataset = new 
SimpleDatasetForTesting(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    datasets1.add(globalWmDataset);
+
+    verifyWorkUnitState(workUnits,"dataset2","p2",false, true);
+    persistDatasetState(datasets1, watermarks1, TEST_JOB_NAME_2);
+    testDatasetStates(datasets1, watermarks1, TEST_JOB_NAME_2);
+
+    // Second run should continue where it left off
+    int workUnitSize = workUnits.size();
+    List<WorkUnitState> workUnitStates =
+        workUnits.subList(workUnitSize - 1, 
workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList());
+
+    List<LongWatermark> watermarks2 = new ArrayList<>();
+    List<Dataset> datasets2 = new ArrayList<>();
+
+    SourceState sourceStateSpy = Mockito.spy(sourceState);
+    
Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 4);
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p3");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets2.add(new SimpleDatasetForTesting("dataset2@p3"));
+
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3@p1");
+    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
+    datasets2.add(new SimpleDatasetForTesting("dataset3@p1"));
+
+    
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3@p2");
+    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks2.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
+    datasets2.add(new SimpleDatasetForTesting("dataset3@p2"));
+
+    
Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY),
 ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    
Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.DATASET_URN),
 "dataset3");
+    
Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.PARTITION_URN),
 "p2");
+    datasets2.add(globalWmDataset);
+
+    verifyWorkUnitState(workUnits,"dataset3","p2",false, true);
+    persistDatasetState(datasets2, watermarks2, TEST_JOB_NAME_2);
+    testDatasetStates(datasets2, watermarks2, TEST_JOB_NAME_2);
+
+    // third run, continue from where it left off
+    workUnitSize = workUnits.size();
+    workUnitStates =
+        workUnits.subList(workUnitSize - 1, 
workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList());
+    
Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+
+    List<LongWatermark> watermarks3 = new ArrayList<>();
+    List<Dataset> datasets3 = new ArrayList<>();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 2);
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3@p3");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets3.add(new SimpleDatasetForTesting("dataset3@p3"));
+
+    
Assert.assertTrue(workUnits.get(1).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    
Assert.assertEquals(workUnits.get(1).getProp(LoopingDatasetFinderSource.DATASET_URN),
 "dataset3");
+    
Assert.assertEquals(workUnits.get(1).getProp(LoopingDatasetFinderSource.PARTITION_URN),
 "p3");
+    datasets3.add(globalWmDataset);
+
+    verifyWorkUnitState(workUnits,"dataset3","p3",true, true);
+    persistDatasetState(datasets3, watermarks3, TEST_JOB_NAME_2);
+    testDatasetStates(datasets3, watermarks3, TEST_JOB_NAME_2);
+
+    // fourth run, finished all work units, loop around
+    workUnitSize = workUnits.size();
+    workUnitStates =
+        workUnits.subList(workUnitSize - 1, 
workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList());
+    
Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+
+    List<LongWatermark> watermarks4 = new ArrayList<>();
+    List<Dataset> datasets4 = new ArrayList<>();
+
+    workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true);
+    workUnits = Lists.newArrayList(workUnitStream.getWorkUnits());
+
+    Assert.assertEquals(workUnits.size(), 4);
+    
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset1");
+    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 watermarks1.get(0).getValue());
+    
watermarks4.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
+    datasets4.add(new SimpleDatasetForTesting("dataset1"));
+
+    
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p1");
+    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 watermarks1.get(1).getValue());
+    
watermarks4.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
+    datasets4.add(new SimpleDatasetForTesting("dataset2@p1"));
+
+    
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p2");
+    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 watermarks1.get(2).getValue());
+    
watermarks4.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
+    datasets4.add(new SimpleDatasetForTesting("dataset2@p2"));
+
+    
Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY),
 ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN);
+    datasets4.add(new 
SimpleDatasetForTesting(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN));
+
+    verifyWorkUnitState(workUnits,"dataset2","p2",false,true);
+    persistDatasetState(datasets4, watermarks4, TEST_JOB_NAME_2);
+    testDatasetStates(datasets4, watermarks4, TEST_JOB_NAME_2);
+  }
+
+  public void verifyWorkUnitState(List<WorkUnit> workUnits, String datasetUrn, 
String partitionUrn,
+      boolean endOfDatasets, boolean isDatasetStateStoreEnabled) {
+    int i;
+    for (i = 0; i < workUnits.size() - 1; i++) {
+      
Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.DATASET_URN));
+      
Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.PARTITION_URN));
+      if(!isDatasetStateStoreEnabled) {
+        
Assert.assertNull(workUnits.get(i).getProp(ConfigurationKeys.DATASET_URN_KEY));
+      }
+      
Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.GLOBAL_WATERMARK_DATASET_KEY));
+      
Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    }
+    
Assert.assertEquals(workUnits.get(i).getProp(LoopingDatasetFinderSource.DATASET_URN),
 datasetUrn);
+    if (partitionUrn != null) {
+      
Assert.assertEquals(workUnits.get(i).getProp(LoopingDatasetFinderSource.PARTITION_URN),
 partitionUrn);
+    } else {
+      
Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.PARTITION_URN));
+    }
+    if (!endOfDatasets) {
+      
Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    } else {
+      
Assert.assertTrue(workUnits.get(i).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY));
+    }
+    Assert
+        
.assertEquals(workUnits.get(i).getPropAsBoolean(LoopingDatasetFinderSource.GLOBAL_WATERMARK_DATASET_KEY),
 true);
+  }
+
+  public void persistDatasetState(List<Dataset> datasets, List<LongWatermark> 
watermarks, String jobName)
+      throws IOException {
+    Preconditions.checkArgument(datasets.size() >= 2);
+    for (int i = 0; i < datasets.size(); i++) {
+      String datasetUrn = datasets.get(i).getUrn();
+      JobState.DatasetState datasetState = new JobState.DatasetState(jobName, 
TEST_JOB_ID);
+
+      datasetState.setDatasetUrn(datasetUrn);
+      datasetState.setState(JobState.RunningState.COMMITTED);
+      datasetState.setId(datasetUrn);
+      datasetState.setStartTime(this.startTime);
+      datasetState.setEndTime(this.startTime + 1000);
+      datasetState.setDuration(1000);
+
+      TaskState taskState = new TaskState();
+      taskState.setJobId(TEST_JOB_ID);
+      taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
+      taskState.setId(TEST_TASK_ID_PREFIX + i);
+      taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      if (i < datasets.size() - 1) {
+        taskState.setActualHighWatermark(watermarks.get(i));
+      }
+      datasetState.addTaskState(taskState);
+
+      this.fsDatasetStateStore.persistDatasetState(datasetUrn, datasetState);
+    }
+  }
+
+  private void testDatasetStates(List<Dataset> datasets, List<LongWatermark> 
watermarks, String jobName)
+      throws IOException {
+    Preconditions.checkArgument(datasets.size() >= 2);
+    for (int i = 0; i < datasets.size(); i++) {
+      JobState.DatasetState datasetState =
+          this.fsDatasetStateStore.getLatestDatasetState(jobName, 
datasets.get(i).getUrn());
+
+      Assert.assertEquals(datasetState.getDatasetUrn(), 
datasets.get(i).getUrn());
+      Assert.assertEquals(datasetState.getJobName(), jobName);
+      Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
+      Assert.assertEquals(datasetState.getState(), 
JobState.RunningState.COMMITTED);
+      Assert.assertEquals(datasetState.getStartTime(), this.startTime);
+      Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
+      Assert.assertEquals(datasetState.getDuration(), 1000);
+
+      Assert.assertEquals(datasetState.getCompletedTasks(), 1);
+      TaskState taskState = datasetState.getTaskStates().get(0);
+      Assert.assertEquals(taskState.getJobId(), TEST_JOB_ID);
+      Assert.assertEquals(taskState.getTaskId(), TEST_TASK_ID_PREFIX + i);
+      Assert.assertEquals(taskState.getId(), TEST_TASK_ID_PREFIX + i);
+      Assert.assertEquals(taskState.getWorkingState(), 
WorkUnitState.WorkingState.COMMITTED);
+      if (i < datasets.size() - 1) {
+        
Assert.assertEquals(taskState.getActualHighWatermark(LongWatermark.class).getValue(),
+            watermarks.get(i).getValue());
+      }
+    }
   }
 
   public static class MySource extends LoopingDatasetFinderSource<String, 
String> {
     private final IterableDatasetFinder datasetsFinder;
+    private boolean isDatasetStateStoreEnabled;
+    private DatasetStateStore fsDatasetStateStore;
+    private String jobName;
+    private Long LAST_PROCESSED_TS = System.currentTimeMillis();
+
+    MySource(boolean drilldownIntoPartitions, IterableDatasetFinder 
datasetsFinder) {
+      super(drilldownIntoPartitions);
+      this.datasetsFinder = datasetsFinder;
+      this.isDatasetStateStoreEnabled = false;
+    }
 
-    public MySource(boolean drilldownIntoPartitions, IterableDatasetFinder 
datasetsFinder) {
+    MySource(boolean drilldownIntoPartitions, IterableDatasetFinder 
datasetsFinder,
+        FsDatasetStateStore fsDatasetStateStore, String jobName) {
       super(drilldownIntoPartitions);
       this.datasetsFinder = datasetsFinder;
+      this.isDatasetStateStoreEnabled = true;
+      this.fsDatasetStateStore = fsDatasetStateStore;
+      this.jobName = jobName;
     }
 
     @Override
-    public Extractor<String, String> getExtractor(WorkUnitState state) throws 
IOException {
+    public Extractor<String, String> getExtractor(WorkUnitState state)
+        throws IOException {
       return null;
     }
 
     @Override
     protected WorkUnit workUnitForDataset(Dataset dataset) {
       WorkUnit workUnit = new WorkUnit();
-      workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, dataset.getUrn());
+      if(isDatasetStateStoreEnabled) {
+        JobState.DatasetState datasetState = null;
+        try {
+          datasetState =
+              (JobState.DatasetState) 
this.fsDatasetStateStore.getLatestDatasetState(this.jobName, dataset.getUrn());
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        LongWatermark previousWatermark;
+        if(datasetState != null) {
+          previousWatermark = 
datasetState.getTaskStatesAsWorkUnitStates().get(0).getActualHighWatermark(LongWatermark.class);
+        } else {
+          previousWatermark = new LongWatermark(0);
+        }
+        workUnit.setWatermarkInterval(new WatermarkInterval(previousWatermark, 
new LongWatermark(LAST_PROCESSED_TS)));
+      }
       return workUnit;
     }
 
     @Override
     protected WorkUnit 
workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition) {
       WorkUnit workUnit = new WorkUnit();
-      workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, 
partition.getDataset().getUrn());
-      workUnit.setProp(DatasetFinderSourceTest.PARTITION_URN, 
partition.getUrn());
+      if(isDatasetStateStoreEnabled) {
+        String datasetUrn = 
partition.getDataset().getUrn()+"@"+partition.getUrn();
+        JobState.DatasetState datasetState = null;
+        try {
+          datasetState =
+              (JobState.DatasetState) 
this.fsDatasetStateStore.getLatestDatasetState(this.jobName, datasetUrn);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        LongWatermark previousWatermark;
+        if(datasetState != null) {
+          previousWatermark = 
datasetState.getTaskStatesAsWorkUnitStates().get(0).getActualHighWatermark(LongWatermark.class);
+        } else {
+          previousWatermark = new LongWatermark(0);
+        }
+        workUnit.setWatermarkInterval(new WatermarkInterval(previousWatermark, 
new LongWatermark(LAST_PROCESSED_TS)));
+      }
       return workUnit;
     }
 
@@ -210,9 +616,19 @@ public class LoopingDatasetFinderSourceTest {
     }
 
     @Override
-    protected IterableDatasetFinder createDatasetsFinder(SourceState state) 
throws IOException {
+    protected IterableDatasetFinder createDatasetsFinder(SourceState state)
+        throws IOException {
       return this.datasetsFinder;
     }
   }
 
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration(false));
+    Path rootDir = new Path(TEST_STATE_STORE_ROOT_DIR);
+    if (fs.exists(rootDir)) {
+      fs.delete(rootDir, true);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
index db882c0..fdde5eb 100644
--- 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
+++ 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java
@@ -93,7 +93,7 @@ public class ZkDatasetStateStore extends 
ZkStateStore<JobState.DatasetState>
   public JobState.DatasetState getLatestDatasetState(String storeName, String 
datasetUrn) throws IOException {
     String alias =
         Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX 
+ DATASET_STATE_STORE_TABLE_SUFFIX
-            : datasetUrn + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + 
DATASET_STATE_STORE_TABLE_SUFFIX;
+            : CharMatcher.is(':').replaceFrom(datasetUrn, '.') + "-" + 
CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX;
     return get(storeName, alias, datasetUrn);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java
new file mode 100644
index 0000000..3e2ccfb
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.runtime;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.CombinedWorkUnitAndDatasetState;
+import 
org.apache.gobblin.configuration.CombinedWorkUnitAndDatasetStateFunctional;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.metastore.DatasetStateStore;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+
+
+/**
+ * A class that returns previous {@link WorkUnitState}s and {@link 
JobState.DatasetState}s from the state store
+ * as a {@link CombinedWorkUnitAndDatasetState}.
+ */
+
+public class CombinedWorkUnitAndDatasetStateGenerator implements 
CombinedWorkUnitAndDatasetStateFunctional {
+  private DatasetStateStore datasetStateStore;
+  private String jobName;
+
+  /**
+   * Constructor.
+   *
+   * @param datasetStateStore the dataset state store
+   * @param jobName the job name
+   */
+  public CombinedWorkUnitAndDatasetStateGenerator(DatasetStateStore 
datasetStateStore, String jobName) {
+    this.datasetStateStore = datasetStateStore;
+    this.jobName = jobName;
+  }
+
+  @Override
+  public CombinedWorkUnitAndDatasetState 
getCombinedWorkUnitAndDatasetState(String datasetUrn)
+      throws Exception {
+    Map<String, JobState.DatasetState> datasetStateMap = ImmutableMap.of();
+    List<WorkUnitState> workUnitStates = new ArrayList<>();
+    if (Strings.isNullOrEmpty(datasetUrn)) {
+      datasetStateMap = 
this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName);
+      workUnitStates = 
JobState.workUnitStatesFromDatasetStates(datasetStateMap.values());
+    } else {
+      JobState.DatasetState datasetState =
+          (JobState.DatasetState) 
this.datasetStateStore.getLatestDatasetState(this.jobName, datasetUrn);
+      if (datasetState != null) {
+        datasetStateMap = ImmutableMap.of(datasetUrn, datasetState);
+        workUnitStates = 
JobState.workUnitStatesFromDatasetStates(Arrays.asList(datasetState));
+      }
+    }
+    return new CombinedWorkUnitAndDatasetState(workUnitStates, 
datasetStateMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
index 9da34d7..5ccf985 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java
@@ -30,11 +30,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.gobblin.metastore.predicates.StateStorePredicate;
-import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
-import 
org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
-import org.apache.gobblin.util.filters.HiddenFilter;
-import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -58,17 +53,22 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValue;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metastore.FsStateStore;
 import org.apache.gobblin.metastore.nameParser.DatasetUrnStateStoreNameParser;
 import 
org.apache.gobblin.metastore.nameParser.SimpleDatasetUrnStateStoreNameParser;
+import 
org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.WritableShimSerialization;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader;
 
 
 /**
@@ -187,7 +187,8 @@ public class FsDatasetStateStore extends 
FsStateStore<JobState.DatasetState> imp
     return getInternal(storeName, tableName, stateId, false);
   }
 
-  public JobState.DatasetState getInternal(String storeName, String tableName, 
String stateId, boolean sanitizeKeyForComparison)
+  public JobState.DatasetState getInternal(String storeName, String tableName, 
String stateId,
+      boolean sanitizeKeyForComparison)
       throws IOException {
     Path tablePath = new Path(new Path(this.storeRootDir, storeName), 
tableName);
 
@@ -206,8 +207,9 @@ public class FsDatasetStateStore extends 
FsStateStore<JobState.DatasetState> imp
         Text key = new Text();
 
         while (reader.next(key)) {
-          String stringKey = sanitizeKeyForComparison ?
-              sanitizeDatasetStatestoreNameFromDatasetURN(storeName, 
key.toString()) : key.toString();
+          String stringKey =
+              sanitizeKeyForComparison ? 
sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString())
+                  : key.toString();
           writable = reader.getCurrentValue(writable);
           if (stringKey.equals(stateId)) {
             if (writable instanceof JobState.DatasetState) {
@@ -235,19 +237,19 @@ public class FsDatasetStateStore extends 
FsStateStore<JobState.DatasetState> imp
 
     Configuration deserializeConfig = new Configuration(this.conf);
     WritableShimSerialization.addToHadoopConfiguration(deserializeConfig);
-    try (@SuppressWarnings("deprecation") GobblinSequenceFileReader reader = 
new GobblinSequenceFileReader(this.fs, tablePath,
-        deserializeConfig)) {
+    try (@SuppressWarnings("deprecation") GobblinSequenceFileReader reader = 
new GobblinSequenceFileReader(this.fs,
+        tablePath, deserializeConfig)) {
 
       /**
        * Add this change so that all stateful flow will have back 
compatibility.
        * Shim layer of state store is therefore avoided because of this change.
        * Keep the implementation of Shim layer temporarily.
        */
-     String className = reader.getValueClassName();
-     if (className.startsWith("gobblin")) {
-       LOGGER.warn("There's old JobState with no apache package name being 
read while we cast them at runtime");
-       className = "org.apache." + className;
-     }
+      String className = reader.getValueClassName();
+      if (className.startsWith("gobblin")) {
+        LOGGER.warn("There's old JobState with no apache package name being 
read while we cast them at runtime");
+        className = "org.apache." + className;
+      }
 
       if (!className.equals(JobState.class.getName()) && 
!className.equals(JobState.DatasetState.class.getName())) {
         throw new RuntimeException("There is a mismatch in the Class Type of 
state in state-store and that in runtime");
@@ -363,8 +365,8 @@ public class FsDatasetStateStore extends 
FsStateStore<JobState.DatasetState> imp
 
     String alias =
         Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX 
+ DATASET_STATE_STORE_TABLE_SUFFIX
-            : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, 
datasetUrn) + "-"
-                + CURRENT_DATASET_STATE_FILE_SUFFIX + 
DATASET_STATE_STORE_TABLE_SUFFIX;
+            : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, 
CharMatcher.is(':').replaceFrom(datasetUrn, '.'))
+                + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + 
DATASET_STATE_STORE_TABLE_SUFFIX;
     return get(storeName, alias, datasetUrn);
   }
 
@@ -424,9 +426,9 @@ public class FsDatasetStateStore extends 
FsStateStore<JobState.DatasetState> imp
   public List<FsDatasetStateStoreEntryManager> 
getMetadataForTables(StateStorePredicate predicate)
       throws IOException {
 
-    Stream<Path> stores = predicate instanceof StoreNamePredicate ?
-        Stream.of(new Path(this.storeRootDir, ((StoreNamePredicate) 
predicate).getStoreName())) :
-        lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath);
+    Stream<Path> stores = predicate instanceof StoreNamePredicate ? Stream
+        .of(new Path(this.storeRootDir, ((StoreNamePredicate) 
predicate).getStoreName()))
+        : lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath);
 
     if (stores == null) {
       return Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index d8fbe4e..2ed8711 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -149,10 +149,10 @@ public class JobContext implements Closeable {
 
     State jobPropsState = new State();
     jobPropsState.addAll(jobProps);
-    this.jobState =
-        new JobState(jobPropsState, 
this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName), this.jobName,
-            this.jobId);
+
+    this.jobState = new JobState(jobPropsState, this.jobName, this.jobId);
     this.jobState.setBroker(this.jobBroker);
+    this.jobState.setWorkUnitAndDatasetStateFunctional(new 
CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName));
 
     stagingDirProvided = 
this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR);
     outputDirProvided = 
this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR);
@@ -189,10 +189,8 @@ public class JobContext implements Closeable {
     if (!stateStoreEnabled) {
       stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP;
     } else {
-      stateStoreType = ConfigUtils
-          .getString(jobConfig, ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY,
-              ConfigUtils.getString(jobConfig, 
ConfigurationKeys.STATE_STORE_TYPE_KEY,
-                  ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
+      stateStoreType = ConfigUtils.getString(jobConfig, 
ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils
+          .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, 
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE));
     }
 
     ClassAliasResolver<DatasetStateStore.Factory> resolver = new 
ClassAliasResolver<>(DatasetStateStore.Factory.class);
@@ -228,8 +226,8 @@ public class JobContext implements Closeable {
     
Preconditions.checkState(this.jobState.contains(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_DIR));
 
     try (FileSystem fs = FileSystem.get(URI.create(this.jobState
-            
.getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_FS_URI,
-                ConfigurationKeys.LOCAL_FS_URI)), 
HadoopUtils.getConfFromState(this.jobState))) {
+            
.getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_FS_URI, 
ConfigurationKeys.LOCAL_FS_URI)),
+        HadoopUtils.getConfFromState(this.jobState))) {
 
       return Optional.<CommitSequenceStore>of(new FsCommitSequenceStore(fs,
           new 
Path(this.jobState.getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_DIR))));
@@ -311,21 +309,24 @@ public class JobContext implements Closeable {
     // Add jobId to writer staging dir
     if (this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR)) {
       String writerStagingDirWithJobId =
-          new 
Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), 
this.getJobName()), this.jobId).toString();
+          new 
Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), 
this.getJobName()),
+              this.jobId).toString();
       this.jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, 
writerStagingDirWithJobId);
     }
 
     // Add jobId to writer output dir
     if (this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
       String writerOutputDirWithJobId =
-          new 
Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), 
this.getJobName()), this.jobId).toString();
+          new 
Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), 
this.getJobName()), this.jobId)
+              .toString();
       this.jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, 
writerOutputDirWithJobId);
     }
 
     // Add jobId to task data root dir
     if (this.jobState.contains(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY)) {
       String taskDataRootDirWithJobId =
-          new 
Path(getJobDir(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), 
this.getJobName()), this.jobId).toString();
+          new 
Path(getJobDir(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), 
this.getJobName()),
+              this.jobId).toString();
       this.jobState.setProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY, 
taskDataRootDirWithJobId);
 
       setTaskStagingDir();
@@ -347,8 +348,8 @@ public class JobContext implements Closeable {
           ConfigurationKeys.WRITER_STAGING_DIR, 
ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY));
     } else {
       String workingDir = 
this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY);
-      this.jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR,
-          new Path(workingDir, TASK_STAGING_DIR_NAME).toString());
+      this.jobState
+          .setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(workingDir, 
TASK_STAGING_DIR_NAME).toString());
       LOG.info(String.format("Writer Staging Directory is set to %s.",
           this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR)));
     }
@@ -367,8 +368,8 @@ public class JobContext implements Closeable {
     } else {
       String workingDir = 
this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY);
       this.jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new 
Path(workingDir, TASK_OUTPUT_DIR_NAME).toString());
-      LOG.info(String.format("Writer Output Directory is set to %s.",
-          this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)));
+      LOG.info(String
+          .format("Writer Output Directory is set to %s.", 
this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR)));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
index 6c16de1..47356fc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.hadoop.io.Text;
 
 import com.codahale.metrics.Counter;
@@ -131,6 +132,7 @@ public class JobState extends SourceState {
   private final Map<String, TaskState> taskStates = Maps.newLinkedHashMap();
   // Skipped task states shouldn't be exposed to publisher, but they need to 
be in JobState and DatasetState so that they can be written to StateStore.
   private final Map<String, TaskState> skippedTaskStates = 
Maps.newLinkedHashMap();
+  private DatasetStateStore datasetStateStore;
 
   // Necessary for serialization/deserialization
   public JobState() {
@@ -142,6 +144,13 @@ public class JobState extends SourceState {
     this.setId(jobId);
   }
 
+  public JobState(State properties,String jobName, String jobId) {
+    super(properties);
+    this.jobName = jobName;
+    this.jobId = jobId;
+    this.setId(jobId);
+  }
+
   public JobState(State properties, Map<String, JobState.DatasetState> 
previousDatasetStates, String jobName,
       String jobId) {
     super(properties, previousDatasetStates, 
workUnitStatesFromDatasetStates(previousDatasetStates.values()));
@@ -728,7 +737,7 @@ public class JobState extends SourceState {
     return datasetState;
   }
 
-  private static List<WorkUnitState> 
workUnitStatesFromDatasetStates(Iterable<JobState.DatasetState> datasetStates) {
+  public static List<WorkUnitState> 
workUnitStatesFromDatasetStates(Iterable<JobState.DatasetState> datasetStates) {
     ImmutableList.Builder<WorkUnitState> taskStateBuilder = 
ImmutableList.builder();
     for (JobState datasetState : datasetStates) {
       taskStateBuilder.addAll(datasetState.getTaskStatesAsWorkUnitStates());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 22a9718..309e4c5 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -102,7 +102,7 @@ public class MysqlDatasetStateStore extends 
MysqlStateStore<JobState.DatasetStat
   public JobState.DatasetState getLatestDatasetState(String storeName, String 
datasetUrn) throws IOException {
     String alias =
         Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX 
+ DATASET_STATE_STORE_TABLE_SUFFIX
-            : datasetUrn + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + 
DATASET_STATE_STORE_TABLE_SUFFIX;
+            : CharMatcher.is(':').replaceFrom(datasetUrn, '.') + "-" + 
CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX;
     return get(storeName, alias, datasetUrn);
   }
 

Reply via email to