This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e5297bc  [GOBBLIN-716] Add lineage in FileBasedSource
e5297bc is described below

commit e5297bcba910c1b6d781fc9b344f8c35c95da8d3
Author: zhchen <[email protected]>
AuthorDate: Fri Mar 29 11:48:36 2019 -0700

    [GOBBLIN-716] Add lineage in FileBasedSource
    
    Closes #2583 from zxcware/fslineage
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +
 .../source/DatePartitionedNestedRetriever.java     |  11 +-
 .../source/PartitionAwareFileRetriever.java        |  31 ++-
 .../gobblin/source/PartitionedFileSourceBase.java  |  33 ++-
 .../source/RegexBasedPartitionedRetriever.java     |   6 +-
 .../extractor/filebased/FileBasedSource.java       |  44 ++++
 .../source/RegexBasedPartitionedRetrieverTest.java |   1 +
 .../extractor/filebased/FileBasedSourceTest.java   | 248 ++++++++++++++-------
 .../gobblin/metrics/event/lineage/LineageInfo.java |  13 +-
 .../metrics/event/lineage/LineageEventTest.java    |  41 ++++
 10 files changed, 330 insertions(+), 100 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 00e71eb..c6ba28d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -270,6 +270,7 @@ public class ConfigurationKeys {
   public static final String WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK = 
"workunit.state.runtime.high.water.mark";
   public static final String WORK_UNIT_STATE_ACTUAL_HIGH_WATER_MARK_KEY = 
"workunit.state.actual.high.water.mark";
   public static final String WORK_UNIT_DATE_PARTITION_KEY = 
"workunit.source.date.partition";
+  public static final String WORK_UNIT_DATE_PARTITION_NAME = 
"workunit.source.date.partitionName";
 
   /**
    * Task execution properties.
@@ -541,6 +542,7 @@ public class ConfigurationKeys {
    * Configuration properties used by the FileBasedExtractor
    */
   public static final String SOURCE_FILEBASED_DATA_DIRECTORY = 
"source.filebased.data.directory";
+  public static final String SOURCE_FILEBASED_PLATFORM = 
"source.filebased.platform";
   public static final String SOURCE_FILEBASED_FILES_TO_PULL = 
"source.filebased.files.to.pull";
   public static final String SOURCE_FILEBASED_MAX_FILES_PER_RUN = 
"source.filebased.maxFilesPerRun";
   public static final String SOURCE_FILEBASED_FS_SNAPSHOT = 
"source.filebased.fs.snapshot";
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index 0dcad65..0ed42da 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -117,13 +117,16 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
     for (DateTime date = lowWaterMarkDate; !date.isAfter(currentDay) && 
filesToProcess.size() < maxFilesToReturn;
         date = date.withFieldAdded(incrementalUnit, 1)) {
 
+      // Constructs the partition path - e.g. prefix/2015/01/01/suffix
+      String partitionPath = constructPartitionPath(date);
       // Constructs the path folder - e.g. /my/data/prefix/2015/01/01/suffix
-      Path sourcePath = constructSourcePath(date);
+      Path sourcePath = new Path(sourceDir, partitionPath);
 
       if (this.fs.exists(sourcePath)) {
         for (FileStatus fileStatus : getFilteredFileStatuses(sourcePath, 
getFileFilter())) {
           LOG.info("Will process file " + fileStatus.getPath());
-          filesToProcess.add(new FileInfo(fileStatus.getPath().toString(), 
fileStatus.getLen(), date.getMillis()));
+          filesToProcess.add(
+              new FileInfo(fileStatus.getPath().toString(), 
fileStatus.getLen(), date.getMillis(), partitionPath));
         }
       }
     }
@@ -186,7 +189,7 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
     this.incrementalUnit = 
partitionType.getDateTimeFieldType().getDurationType();
   }
 
-  private Path constructSourcePath(DateTime date) {
+  private String constructPartitionPath(DateTime date) {
     StringBuilder pathBuilder = new StringBuilder();
 
     if (!this.sourcePartitionPrefix.isEmpty()) {
@@ -201,7 +204,7 @@ public class DatePartitionedNestedRetriever implements 
PartitionAwareFileRetriev
       pathBuilder.append(this.sourcePartitionSuffix);
     }
 
-    return new Path(this.sourceDir, pathBuilder.toString());
+    return pathBuilder.toString();
   }
 
   /**
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
index c1df008..a8a90b9 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.source;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 
 import org.apache.gobblin.configuration.SourceState;
 
@@ -66,11 +67,17 @@ public interface PartitionAwareFileRetriever {
     private final String filePath;
     private final long fileSize;
     private final long watermarkMsSinceEpoch;
+    private final String partitionName;
 
-    public FileInfo(String filePath, long fileSize, long 
watermarkMsSinceEpoch) {
+    public FileInfo(String filePath, long fileSize, long 
watermarkMsSinceEpoch, String partitionName) {
       this.fileSize = fileSize;
       this.filePath = filePath;
       this.watermarkMsSinceEpoch = watermarkMsSinceEpoch;
+      this.partitionName = partitionName;
+    }
+
+    public FileInfo(String filePath, long fileSize, long 
watermarkMsSinceEpoch) {
+      this(filePath, fileSize, watermarkMsSinceEpoch, 
Long.toString(watermarkMsSinceEpoch));
     }
 
     public String getFilePath() {
@@ -85,9 +92,14 @@ public interface PartitionAwareFileRetriever {
       return fileSize;
     }
 
+    public String getPartitionName() {
+      return partitionName;
+    }
+
     @Override
     public String toString() {
-      return "FileInfo{" + "filePath='" + filePath + '\'' + ", 
watermarkMsSinceEpoch=" + watermarkMsSinceEpoch + '}';
+      return "FileInfo{" + "filePath='" + filePath + '\'' + ", 
watermarkMsSinceEpoch=" + watermarkMsSinceEpoch +
+          ", partitionName=" + partitionName + '}';
     }
 
     @Override
@@ -97,7 +109,8 @@ public interface PartitionAwareFileRetriever {
       } else if (watermarkMsSinceEpoch > o.watermarkMsSinceEpoch) {
         return 1;
       } else {
-        return filePath.compareTo(o.filePath);
+        int ret = filePath.compareTo(o.filePath);
+        return ret == 0 ? partitionName.compareTo(o.partitionName) : ret;
       }
     }
 
@@ -109,20 +122,14 @@ public interface PartitionAwareFileRetriever {
       if (o == null || getClass() != o.getClass()) {
         return false;
       }
-
       FileInfo fileInfo = (FileInfo) o;
-
-      if (watermarkMsSinceEpoch != fileInfo.watermarkMsSinceEpoch) {
-        return false;
-      }
-      return filePath != null ? filePath.equals(fileInfo.filePath) : 
fileInfo.filePath == null;
+      return fileSize == fileInfo.fileSize && watermarkMsSinceEpoch == 
fileInfo.watermarkMsSinceEpoch && Objects
+          .equals(filePath, fileInfo.filePath) && 
Objects.equals(partitionName, fileInfo.partitionName);
     }
 
     @Override
     public int hashCode() {
-      int result = filePath != null ? filePath.hashCode() : 0;
-      result = 31 * result + (int) (watermarkMsSinceEpoch ^ 
(watermarkMsSinceEpoch >>> 32));
-      return result;
+      return Objects.hash(filePath, fileSize, watermarkMsSinceEpoch, 
partitionName);
     }
   }
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index 1b54895..83da81d 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -33,10 +33,16 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
 import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
@@ -58,6 +64,7 @@ import 
org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner;
  * {@link ConfigurationKeys#SOURCE_FILEBASED_DATA_DIRECTORY}. It relies on a 
{@link PartitionAwareFileRetriever}
  * to actually retrieve the files in a given partition.
  */
+@Slf4j
 public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends 
FileBasedSource<SCHEMA, DATA> {
 
   // Configuration parameters
@@ -197,6 +204,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
   @Override
   public List<WorkUnit> getWorkunits(SourceState state) {
 
+    lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
     DateTimeFormatter formatter = DateTimeFormat.fullDateTime();
 
     // Initialize all instance variables for this object
@@ -222,7 +230,28 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
 
     addNewWorkUnits(multiWorkUnitWeightedQueue);
 
-    return multiWorkUnitWeightedQueue.getQueueAsList();
+    List<WorkUnit> workUnits = multiWorkUnitWeightedQueue.getQueueAsList();
+    addLineageSourceInfo(workUnits, state);
+
+    return workUnits;
+  }
+
+  @Override
+  protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
+    if (!lineageInfo.isPresent()) {
+      log.info("Lineage is not enabled");
+      return;
+    }
+
+    String platform = 
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM, 
DatasetConstants.PLATFORM_HDFS);
+    Path dataDir = new 
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+    String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
+    DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform, 
dataset);
+
+    String partitionName = 
workUnit.getProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_NAME);
+    PartitionDescriptor descriptor = new PartitionDescriptor(partitionName, 
datasetDescriptor);
+
+    lineageInfo.get().setSource(descriptor, workUnit);
   }
 
   /**
@@ -290,7 +319,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, 
DATA> extends FileBasedS
         singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, 
file.getWatermarkMsSinceEpoch());
         
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, 
file.getWatermarkMsSinceEpoch());
         singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, 
file.getWatermarkMsSinceEpoch());
-
+        
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_NAME, 
file.getPartitionName());
         if 
(this.sourceState.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
             ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR)) {
           addSchemaFile(file, singleWorkUnit);
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
index 7d3ad92..82bd850 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
@@ -38,6 +38,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
 import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
+import org.apache.gobblin.util.PathUtils;
 
 
 public class RegexBasedPartitionedRetriever implements 
PartitionAwareFileRetriever {
@@ -114,7 +115,8 @@ public class RegexBasedPartitionedRetriever implements 
PartitionAwareFileRetriev
           filesToProcess.add(new FileInfo(
               file.getPath().toString(),
               file.getLen(),
-              outerDirectory.getWatermarkMsSinceEpoch()
+              outerDirectory.getWatermarkMsSinceEpoch(),
+              outerDirectory.getPartitionName()
           ));
         }
 
@@ -152,7 +154,7 @@ public class RegexBasedPartitionedRetriever implements 
PartitionAwareFileRetriev
           outerDirectories.add(new FileInfo(
               file.getPath().toString(),
               0,
-              watermark
+              watermark, PathUtils.relativizePath(file.getPath(), 
sourceDir).toString()
           ));
         } else {
           LOGGER.info("Ignoring directory {} - watermark {} is not between 
minWatermark {} and (now-leadTime) {}",
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index e34a28f..14730e8 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -30,10 +30,12 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
@@ -43,8 +45,12 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.Extract.TableType;
 
@@ -60,6 +66,8 @@ public abstract class FileBasedSource<S, D> extends 
AbstractSource<S, D> {
   protected TimestampAwareFileBasedHelper fsHelper;
   protected String splitPattern = ":::";
 
+  protected Optional<LineageInfo> lineageInfo;
+
   /**
    * Initialize the logger.
    *
@@ -85,6 +93,8 @@ public abstract class FileBasedSource<S, D> extends 
AbstractSource<S, D> {
   @Override
   public List<WorkUnit> getWorkunits(SourceState state) {
     initLogger(state);
+    lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
     try {
       initFileSystemHelper(state);
     } catch (FileBasedHelperException e) {
@@ -199,10 +209,44 @@ public abstract class FileBasedSource<S, D> extends 
AbstractSource<S, D> {
       log.info("Total number of work units for the current run: " + 
(workUnits.size() - previousWorkUnitsForRetry.size()));
     }
 
+    addLineageSourceInfo(workUnits, state);
     return workUnits;
   }
 
   /**
+   * Add lineage source info to a list of work units, it can have instances of
+   * {@link org.apache.gobblin.source.workunit.MultiWorkUnit}
+   */
+  protected void addLineageSourceInfo(List<WorkUnit> workUnits, State state) {
+    workUnits.forEach(workUnit -> {
+      if (workUnit instanceof MultiWorkUnit) {
+        ((MultiWorkUnit) workUnit).getWorkUnits().forEach((wu -> 
addLineageSourceInfo(wu, state)));
+      } else {
+        addLineageSourceInfo(workUnit, state);
+      }
+    });
+  }
+
+  /**
+   * Add lineage source info to a single work unit
+   *
+   * @param workUnit a single work unit, not an instance of {@link 
org.apache.gobblin.source.workunit.MultiWorkUnit}
+   * @param state configurations
+   */
+  protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
+    if (!lineageInfo.isPresent()) {
+      log.info("Lineage is not enabled");
+      return;
+    }
+
+    String platform = 
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM, 
DatasetConstants.PLATFORM_HDFS);
+    Path dataDir = new 
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+    String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
+    DatasetDescriptor source = new DatasetDescriptor(platform, dataset);
+    lineageInfo.get().setSource(source, workUnit);
+  }
+
+  /**
    * This method is responsible for connecting to the source and taking
    * a snapshot of the folder where the data is present, it then returns
    * a list of the files in String format
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
index cb35eb6..2a6d92b 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
@@ -130,5 +130,6 @@ public class RegexBasedPartitionedRetrieverTest {
     
Assert.assertTrue(fileInfo.getFilePath().startsWith(expectedStart.toString()));
     Assert.assertTrue(fileInfo.getFilePath().endsWith(expectedEnd));
     Assert.assertEquals(fileInfo.getFileSize(), 0);
+    Assert.assertEquals(fileInfo.getPartitionName(), 
String.format("%d-PT-123456", value));
    }
 }
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index 95b3656..15d9616 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -17,112 +17,204 @@
 
 package org.apache.gobblin.source.extractor.filebased;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
 import org.apache.gobblin.source.DatePartitionedJsonFileSource;
 import org.apache.gobblin.source.PartitionedFileSourceBase;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.hadoop.AvroFileSource;
 import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
+
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.typesafe.config.ConfigFactory;
 
 
 @Test
 public class FileBasedSourceTest {
-    @Test
-    public void testFailJobWhenPreviousStateExistsButDoesNotHaveSnapshot() {
-        try {
-            DummyFileBasedSource source = new DummyFileBasedSource();
-
-            WorkUnitState workUnitState = new WorkUnitState();
-            workUnitState.setId("priorState");
-            List<WorkUnitState> workUnitStates = 
Lists.newArrayList(workUnitState);
-
-            State state = new State();
-            state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, 
Extract.TableType.SNAPSHOT_ONLY.toString());
-            
state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED, 
true);
-
-            SourceState sourceState = new SourceState(state, workUnitStates);
-
-            source.getWorkunits(sourceState);
-            Assert.fail("Expected RuntimeException, but no exceptions were 
thrown.");
-        } catch (RuntimeException e) {
-            Assert.assertEquals("No 'source.filebased.fs.snapshot' found on 
state of prior job",
-                e.getMessage());
-        }
+
+  private static final String SOURCE_LINEAGE_KEY = 
"gobblin.event.lineage.source";
+  SharedResourcesBroker<GobblinScopeTypes> instanceBroker;
+  SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+  Path sourceDir;
+
+  @BeforeClass
+  public void setup() {
+    instanceBroker = SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(ConfigFactory.empty(), 
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    jobBroker = instanceBroker
+        .newSubscopedBuilder(new JobScopeInstance("LineageEventTest", 
String.valueOf(System.currentTimeMillis())))
+        .build();
+    sourceDir = new Path(getClass().getResource("/source").toString());
+  }
+
+  @Test
+  public void testFailJobWhenPreviousStateExistsButDoesNotHaveSnapshot() {
+    try {
+      DummyFileBasedSource source = new DummyFileBasedSource();
+
+      WorkUnitState workUnitState = new WorkUnitState();
+      workUnitState.setId("priorState");
+      List<WorkUnitState> workUnitStates = Lists.newArrayList(workUnitState);
+
+      State state = new State();
+      state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, 
Extract.TableType.SNAPSHOT_ONLY.toString());
+      
state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED, 
true);
+
+      SourceState sourceState = new SourceState(state, workUnitStates);
+
+      source.getWorkunits(sourceState);
+      Assert.fail("Expected RuntimeException, but no exceptions were thrown.");
+    } catch (RuntimeException e) {
+      Assert.assertEquals("No 'source.filebased.fs.snapshot' found on state of 
prior job", e.getMessage());
+    }
+  }
+
+  @Test
+  void numberOfWorkUnits()
+      throws IOException {
+    SourceState sourceState = new SourceState();
+    sourceState.setBroker(jobBroker);
+    DatePartitionedJsonFileSource source = new DatePartitionedJsonFileSource();
+    initState(sourceState);
+    List<WorkUnit> workUnits = source.getWorkunits(sourceState);
+    Assert.assertEquals(3, workUnits.size());
+  }
+
+  @Test
+  public void testSourceLineage() {
+    String dataset = 
Path.getPathWithoutSchemeAndAuthority(sourceDir).toString();
+
+    SourceState sourceState = new SourceState();
+    sourceState.setBroker(jobBroker);
+    initState(sourceState);
+
+    // Avro file based source
+    AvroFileSource fileSource = new AvroFileSource();
+    List<WorkUnit> workUnits = fileSource.getWorkunits(sourceState);
+    DatasetDescriptor datasetDescriptor = new DatasetDescriptor("hdfs", 
dataset);
+    for (WorkUnit workUnit : workUnits) {
+      Assert.assertEquals(workUnit.getProp(SOURCE_LINEAGE_KEY), 
Descriptor.toJson(datasetDescriptor));
+    }
+
+    // Partitioned file based source
+    // Test platform configuration
+    sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM, 
DatasetConstants.PLATFORM_FILE);
+    DatePartitionedJsonFileSource partitionedFileSource = new 
DatePartitionedJsonFileSource();
+    workUnits = partitionedFileSource.getWorkunits(sourceState);
+    datasetDescriptor = new DatasetDescriptor("file", dataset);
+
+    Set<String> partitions = Sets.newHashSet("2017-12", "2018-01");
+    for (WorkUnit workUnit : workUnits) {
+      if (workUnit instanceof MultiWorkUnit) {
+        DatasetDescriptor finalDatasetDescriptor = datasetDescriptor;
+        ((MultiWorkUnit) workUnit).getWorkUnits().forEach( wu -> 
verifyPartitionSourceLineage(wu, partitions,
+            finalDatasetDescriptor));
+      } else {
+        verifyPartitionSourceLineage(workUnit, partitions, datasetDescriptor);
+      }
+    }
+  }
+
+  private void verifyPartitionSourceLineage(WorkUnit wu, Set<String> 
partitions, DatasetDescriptor datasetDescriptor) {
+    PartitionDescriptor descriptor = (PartitionDescriptor) 
Descriptor.fromJson(wu.getProp(SOURCE_LINEAGE_KEY));
+    Assert.assertTrue(partitions.contains(descriptor.getName()));
+    Assert.assertEquals(descriptor.getDataset(), datasetDescriptor);
+  }
+
+  private void initState(State state) {
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, 
sourceDir.toString());
+    
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN,
 "yyyy-MM");
+    
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE,
 "2017-11");
+    state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, "snapshot_only");
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true");
+    state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json");
+  }
+
+  @AfterClass
+  public void cleanup()
+      throws IOException {
+    if (jobBroker != null) {
+      jobBroker.close();
+    }
+
+    if (instanceBroker != null) {
+      instanceBroker.close();
+    }
+  }
+
+  private static class DummyFileBasedSource extends FileBasedSource<String, 
String> {
+    @Override
+    public void initFileSystemHelper(State state)
+        throws FileBasedHelperException {
+    }
+
+    @Override
+    protected List<WorkUnit> getPreviousWorkUnitsForRetry(SourceState state) {
+      return Lists.newArrayList();
+    }
+
+    @Override
+    public List<String> getcurrentFsSnapshot(State state) {
+      return Lists.newArrayList("SnapshotEntry");
+    }
+
+    @Override
+    public Extractor<String, String> getExtractor(WorkUnitState state)
+        throws IOException {
+      return new DummyExtractor();
+    }
+  }
+
+  private static class DummyExtractor implements Extractor<String, String> {
+    @Override
+    public String getSchema() {
+      return "";
     }
 
-    @Test void numberOfWorkUnits() throws IOException {
-        SourceState sourceState = new SourceState();
-        DatePartitionedJsonFileSource source = new 
DatePartitionedJsonFileSource();
-        initState(sourceState);
-        List<WorkUnit> workUnits = source.getWorkunits(sourceState);
-        Assert.assertEquals(3, workUnits.size());
+    @Override
+    public String readRecord(String reuse)
+        throws DataRecordException, IOException {
+      return null;
     }
 
-    private void initState(State state) {
-        state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY,
-            new Path(getClass().getResource("/source").toString()).toString());
-        
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN,
 "yyyy-MM");
-        
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE,
 "2017-11");
-        state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, 
"snapshot_only");
-        state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
-        state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true");
-        state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json");
+    @Override
+    public long getExpectedRecordCount() {
+      return 0;
     }
 
-    private static class DummyFileBasedSource extends FileBasedSource<String, 
String> {
-        @Override
-        public void initFileSystemHelper(State state) throws 
FileBasedHelperException {
-        }
-
-        @Override
-        protected List<WorkUnit> getPreviousWorkUnitsForRetry(SourceState 
state) {
-            return Lists.newArrayList();
-        }
-
-        @Override
-        public List<String> getcurrentFsSnapshot(State state) {
-            return Lists.newArrayList("SnapshotEntry");
-        }
-
-        @Override
-        public Extractor<String, String> getExtractor(WorkUnitState state) 
throws IOException {
-            return new DummyExtractor();
-        }
+    @Override
+    public long getHighWatermark() {
+      return 0;
     }
 
-    private static class DummyExtractor implements Extractor<String, String> {
-        @Override
-        public String getSchema() {
-            return "";
-        }
-
-        @Override
-        public String readRecord(String reuse) throws DataRecordException, 
IOException {
-            return null;
-        }
-
-        @Override
-        public long getExpectedRecordCount() {
-            return 0;
-        }
-
-        @Override
-        public long getHighWatermark() {
-            return 0;
-        }
-
-        @Override
-        public void close() throws IOException {}
+    @Override
+    public void close()
+        throws IOException {
     }
+  }
 }
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 3e65105..ed8903d 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -158,8 +160,15 @@ public final class LineageInfo {
         resolvedDescriptors.add(resolvedDescriptor);
       }
 
-      state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION),
-          Descriptor.toJson(resolvedDescriptors));
+      String destinationKey = getKey(BRANCH, branchId, 
LineageEventBuilder.DESTINATION);
+      String currentDestinations = state.getProp(destinationKey);
+      List<Descriptor> allDescriptors = Lists.newArrayList();
+      if (StringUtils.isNotEmpty(currentDestinations)) {
+        allDescriptors = Descriptor.fromJsonList(currentDestinations);
+      }
+      allDescriptors.addAll(resolvedDescriptors);
+
+      state.setProp(destinationKey, Descriptor.toJson(allDescriptors));
     }
   }
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 5fd7952..e792a7f 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -138,6 +138,36 @@ public class LineageEventTest {
     Assert.assertEquals(LineageEventBuilder.fromEvent(trackingEvent), event);
   }
 
+  @Test
+  public void testMultiPuts() {
+    final String topic = "testTopic";
+    final String kafka = "kafka";
+    final String hdfs = "hdfs";
+    final String path = "/data/tracking/PageViewEvent";
+    final String partitionName = "hourly/2018/08/15/15";
+
+    State state = new State();
+    LineageInfo lineageInfo = getLineageInfo();
+    DatasetDescriptor source = new DatasetDescriptor(kafka, topic);
+    lineageInfo.setSource(source, state);
+    DatasetDescriptor destinationDataset = new DatasetDescriptor(hdfs, path);
+    PartitionDescriptor destination = new PartitionDescriptor(partitionName, 
destinationDataset);
+    lineageInfo.putDestination(Lists.newArrayList(destination), 0, state);
+
+    // Put another destination
+    DatasetDescriptor destinationDataset2 = new DatasetDescriptor(kafka, 
"nextTopic");
+    lineageInfo.putDestination(Lists.newArrayList(destinationDataset2), 0, 
state);
+
+    Map<String, Set<LineageEventBuilder>> eventsMap = LineageInfo.load(state);
+    Assert.assertEquals(eventsMap.size(), 1);
+
+    Set<LineageEventBuilder> events = eventsMap.get("0");
+    Assert.assertEquals(events.size(), 2);
+
+    verifyOne(events, topic, source, destination);
+    verifyOne(events, topic, source, destinationDataset2);
+  }
+
   private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> 
events, int branchId, String destinationPlatform) {
     for (LineageEventBuilder event : events) {
       DatasetDescriptor descriptor = (DatasetDescriptor) 
event.getDestination();
@@ -175,4 +205,15 @@ public class LineageEventTest {
   private <T> T first(Collection<T> collection) {
     return collection.iterator().next();
   }
+
+  private void verifyOne(Collection<LineageEventBuilder> collection, String 
name, Descriptor source, Descriptor destination) {
+    for (LineageEventBuilder event : collection) {
+      if (event.getDestination().equals(destination)) {
+        verify(event, name, source, destination);
+        return;
+      }
+    }
+
+    Assert.fail("Could not find a matching lineage with destination: " + 
destination);
+  }
 }

Reply via email to