This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e5297bc [GOBBLIN-716] Add lineage in FileBasedSource
e5297bc is described below
commit e5297bcba910c1b6d781fc9b344f8c35c95da8d3
Author: zhchen <[email protected]>
AuthorDate: Fri Mar 29 11:48:36 2019 -0700
[GOBBLIN-716] Add lineage in FileBasedSource
Closes #2583 from zxcware/fslineage
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../source/DatePartitionedNestedRetriever.java | 11 +-
.../source/PartitionAwareFileRetriever.java | 31 ++-
.../gobblin/source/PartitionedFileSourceBase.java | 33 ++-
.../source/RegexBasedPartitionedRetriever.java | 6 +-
.../extractor/filebased/FileBasedSource.java | 44 ++++
.../source/RegexBasedPartitionedRetrieverTest.java | 1 +
.../extractor/filebased/FileBasedSourceTest.java | 248 ++++++++++++++-------
.../gobblin/metrics/event/lineage/LineageInfo.java | 13 +-
.../metrics/event/lineage/LineageEventTest.java | 41 ++++
10 files changed, 330 insertions(+), 100 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 00e71eb..c6ba28d 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -270,6 +270,7 @@ public class ConfigurationKeys {
public static final String WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK =
"workunit.state.runtime.high.water.mark";
public static final String WORK_UNIT_STATE_ACTUAL_HIGH_WATER_MARK_KEY =
"workunit.state.actual.high.water.mark";
public static final String WORK_UNIT_DATE_PARTITION_KEY =
"workunit.source.date.partition";
+ public static final String WORK_UNIT_DATE_PARTITION_NAME =
"workunit.source.date.partitionName";
/**
* Task execution properties.
@@ -541,6 +542,7 @@ public class ConfigurationKeys {
* Configuration properties used by the FileBasedExtractor
*/
public static final String SOURCE_FILEBASED_DATA_DIRECTORY =
"source.filebased.data.directory";
+ public static final String SOURCE_FILEBASED_PLATFORM =
"source.filebased.platform";
public static final String SOURCE_FILEBASED_FILES_TO_PULL =
"source.filebased.files.to.pull";
public static final String SOURCE_FILEBASED_MAX_FILES_PER_RUN =
"source.filebased.maxFilesPerRun";
public static final String SOURCE_FILEBASED_FS_SNAPSHOT =
"source.filebased.fs.snapshot";
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index 0dcad65..0ed42da 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -117,13 +117,16 @@ public class DatePartitionedNestedRetriever implements
PartitionAwareFileRetriev
for (DateTime date = lowWaterMarkDate; !date.isAfter(currentDay) &&
filesToProcess.size() < maxFilesToReturn;
date = date.withFieldAdded(incrementalUnit, 1)) {
+ // Constructs the partition path - e.g. prefix/2015/01/01/suffix
+ String partitionPath = constructPartitionPath(date);
// Constructs the path folder - e.g. /my/data/prefix/2015/01/01/suffix
- Path sourcePath = constructSourcePath(date);
+ Path sourcePath = new Path(sourceDir, partitionPath);
if (this.fs.exists(sourcePath)) {
for (FileStatus fileStatus : getFilteredFileStatuses(sourcePath,
getFileFilter())) {
LOG.info("Will process file " + fileStatus.getPath());
- filesToProcess.add(new FileInfo(fileStatus.getPath().toString(),
fileStatus.getLen(), date.getMillis()));
+ filesToProcess.add(
+ new FileInfo(fileStatus.getPath().toString(),
fileStatus.getLen(), date.getMillis(), partitionPath));
}
}
}
@@ -186,7 +189,7 @@ public class DatePartitionedNestedRetriever implements
PartitionAwareFileRetriev
this.incrementalUnit =
partitionType.getDateTimeFieldType().getDurationType();
}
- private Path constructSourcePath(DateTime date) {
+ private String constructPartitionPath(DateTime date) {
StringBuilder pathBuilder = new StringBuilder();
if (!this.sourcePartitionPrefix.isEmpty()) {
@@ -201,7 +204,7 @@ public class DatePartitionedNestedRetriever implements
PartitionAwareFileRetriev
pathBuilder.append(this.sourcePartitionSuffix);
}
- return new Path(this.sourceDir, pathBuilder.toString());
+ return pathBuilder.toString();
}
/**
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
index c1df008..a8a90b9 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionAwareFileRetriever.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.source;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
import org.apache.gobblin.configuration.SourceState;
@@ -66,11 +67,17 @@ public interface PartitionAwareFileRetriever {
private final String filePath;
private final long fileSize;
private final long watermarkMsSinceEpoch;
+ private final String partitionName;
- public FileInfo(String filePath, long fileSize, long
watermarkMsSinceEpoch) {
+ public FileInfo(String filePath, long fileSize, long
watermarkMsSinceEpoch, String partitionName) {
this.fileSize = fileSize;
this.filePath = filePath;
this.watermarkMsSinceEpoch = watermarkMsSinceEpoch;
+ this.partitionName = partitionName;
+ }
+
+ public FileInfo(String filePath, long fileSize, long
watermarkMsSinceEpoch) {
+ this(filePath, fileSize, watermarkMsSinceEpoch,
Long.toString(watermarkMsSinceEpoch));
}
public String getFilePath() {
@@ -85,9 +92,14 @@ public interface PartitionAwareFileRetriever {
return fileSize;
}
+ public String getPartitionName() {
+ return partitionName;
+ }
+
@Override
public String toString() {
- return "FileInfo{" + "filePath='" + filePath + '\'' + ",
watermarkMsSinceEpoch=" + watermarkMsSinceEpoch + '}';
+ return "FileInfo{" + "filePath='" + filePath + '\'' + ",
watermarkMsSinceEpoch=" + watermarkMsSinceEpoch +
+ ", partitionName=" + partitionName + '}';
}
@Override
@@ -97,7 +109,8 @@ public interface PartitionAwareFileRetriever {
} else if (watermarkMsSinceEpoch > o.watermarkMsSinceEpoch) {
return 1;
} else {
- return filePath.compareTo(o.filePath);
+ int ret = filePath.compareTo(o.filePath);
+ return ret == 0 ? partitionName.compareTo(o.partitionName) : ret;
}
}
@@ -109,20 +122,14 @@ public interface PartitionAwareFileRetriever {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
FileInfo fileInfo = (FileInfo) o;
-
- if (watermarkMsSinceEpoch != fileInfo.watermarkMsSinceEpoch) {
- return false;
- }
- return filePath != null ? filePath.equals(fileInfo.filePath) :
fileInfo.filePath == null;
+ return fileSize == fileInfo.fileSize && watermarkMsSinceEpoch ==
fileInfo.watermarkMsSinceEpoch && Objects
+ .equals(filePath, fileInfo.filePath) &&
Objects.equals(partitionName, fileInfo.partitionName);
}
@Override
public int hashCode() {
- int result = filePath != null ? filePath.hashCode() : 0;
- result = 31 * result + (int) (watermarkMsSinceEpoch ^
(watermarkMsSinceEpoch >>> 32));
- return result;
+ return Objects.hash(filePath, fileSize, watermarkMsSinceEpoch,
partitionName);
}
}
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index 1b54895..83da81d 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -33,10 +33,16 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Throwables;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
@@ -58,6 +64,7 @@ import
org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner;
* {@link ConfigurationKeys#SOURCE_FILEBASED_DATA_DIRECTORY}. It relies on a
{@link PartitionAwareFileRetriever}
* to actually retrieve the files in a given partition.
*/
+@Slf4j
public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends
FileBasedSource<SCHEMA, DATA> {
// Configuration parameters
@@ -197,6 +204,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
+ lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
DateTimeFormatter formatter = DateTimeFormat.fullDateTime();
// Initialize all instance variables for this object
@@ -222,7 +230,28 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
addNewWorkUnits(multiWorkUnitWeightedQueue);
- return multiWorkUnitWeightedQueue.getQueueAsList();
+ List<WorkUnit> workUnits = multiWorkUnitWeightedQueue.getQueueAsList();
+ addLineageSourceInfo(workUnits, state);
+
+ return workUnits;
+ }
+
+ @Override
+ protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
+ if (!lineageInfo.isPresent()) {
+ log.info("Lineage is not enabled");
+ return;
+ }
+
+ String platform =
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM,
DatasetConstants.PLATFORM_HDFS);
+ Path dataDir = new
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+ String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
+ DatasetDescriptor datasetDescriptor = new DatasetDescriptor(platform,
dataset);
+
+ String partitionName =
workUnit.getProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_NAME);
+ PartitionDescriptor descriptor = new PartitionDescriptor(partitionName,
datasetDescriptor);
+
+ lineageInfo.get().setSource(descriptor, workUnit);
}
/**
@@ -290,7 +319,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA,
DATA> extends FileBasedS
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY,
file.getWatermarkMsSinceEpoch());
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY,
file.getWatermarkMsSinceEpoch());
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY,
file.getWatermarkMsSinceEpoch());
-
+
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_NAME,
file.getPartitionName());
if
(this.sourceState.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR)) {
addSchemaFile(file, singleWorkUnit);
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
index 7d3ad92..82bd850 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
@@ -38,6 +38,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
+import org.apache.gobblin.util.PathUtils;
public class RegexBasedPartitionedRetriever implements
PartitionAwareFileRetriever {
@@ -114,7 +115,8 @@ public class RegexBasedPartitionedRetriever implements
PartitionAwareFileRetriev
filesToProcess.add(new FileInfo(
file.getPath().toString(),
file.getLen(),
- outerDirectory.getWatermarkMsSinceEpoch()
+ outerDirectory.getWatermarkMsSinceEpoch(),
+ outerDirectory.getPartitionName()
));
}
@@ -152,7 +154,7 @@ public class RegexBasedPartitionedRetriever implements
PartitionAwareFileRetriev
outerDirectories.add(new FileInfo(
file.getPath().toString(),
0,
- watermark
+ watermark, PathUtils.relativizePath(file.getPath(),
sourceDir).toString()
));
} else {
LOGGER.info("Ignoring directory {} - watermark {} is not between
minWatermark {} and (now-leadTime) {}",
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index e34a28f..14730e8 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -30,10 +30,12 @@ import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
@@ -43,8 +45,12 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.Extract.TableType;
@@ -60,6 +66,8 @@ public abstract class FileBasedSource<S, D> extends
AbstractSource<S, D> {
protected TimestampAwareFileBasedHelper fsHelper;
protected String splitPattern = ":::";
+ protected Optional<LineageInfo> lineageInfo;
+
/**
* Initialize the logger.
*
@@ -85,6 +93,8 @@ public abstract class FileBasedSource<S, D> extends
AbstractSource<S, D> {
@Override
public List<WorkUnit> getWorkunits(SourceState state) {
initLogger(state);
+ lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
try {
initFileSystemHelper(state);
} catch (FileBasedHelperException e) {
@@ -199,10 +209,44 @@ public abstract class FileBasedSource<S, D> extends
AbstractSource<S, D> {
log.info("Total number of work units for the current run: " +
(workUnits.size() - previousWorkUnitsForRetry.size()));
}
+ addLineageSourceInfo(workUnits, state);
return workUnits;
}
/**
+ * Add lineage source info to a list of work units, it can have instances of
+ * {@link org.apache.gobblin.source.workunit.MultiWorkUnit}
+ */
+ protected void addLineageSourceInfo(List<WorkUnit> workUnits, State state) {
+ workUnits.forEach(workUnit -> {
+ if (workUnit instanceof MultiWorkUnit) {
+ ((MultiWorkUnit) workUnit).getWorkUnits().forEach((wu ->
addLineageSourceInfo(wu, state)));
+ } else {
+ addLineageSourceInfo(workUnit, state);
+ }
+ });
+ }
+
+ /**
+ * Add lineage source info to a single work unit
+ *
+ * @param workUnit a single work unit, not an instance of {@link
org.apache.gobblin.source.workunit.MultiWorkUnit}
+ * @param state configurations
+ */
+ protected void addLineageSourceInfo(WorkUnit workUnit, State state) {
+ if (!lineageInfo.isPresent()) {
+ log.info("Lineage is not enabled");
+ return;
+ }
+
+ String platform =
state.getProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM,
DatasetConstants.PLATFORM_HDFS);
+ Path dataDir = new
Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+ String dataset = Path.getPathWithoutSchemeAndAuthority(dataDir).toString();
+ DatasetDescriptor source = new DatasetDescriptor(platform, dataset);
+ lineageInfo.get().setSource(source, workUnit);
+ }
+
+ /**
* This method is responsible for connecting to the source and taking
* a snapshot of the folder where the data is present, it then returns
* a list of the files in String format
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
index cb35eb6..2a6d92b 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/source/RegexBasedPartitionedRetrieverTest.java
@@ -130,5 +130,6 @@ public class RegexBasedPartitionedRetrieverTest {
Assert.assertTrue(fileInfo.getFilePath().startsWith(expectedStart.toString()));
Assert.assertTrue(fileInfo.getFilePath().endsWith(expectedEnd));
Assert.assertEquals(fileInfo.getFileSize(), 0);
+ Assert.assertEquals(fileInfo.getPartitionName(),
String.format("%d-PT-123456", value));
}
}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index 95b3656..15d9616 100644
---
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -17,112 +17,204 @@
package org.apache.gobblin.source.extractor.filebased;
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.dataset.Descriptor;
+import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.source.DatePartitionedJsonFileSource;
import org.apache.gobblin.source.PartitionedFileSourceBase;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.hadoop.AvroFileSource;
import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
+
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+import com.typesafe.config.ConfigFactory;
@Test
public class FileBasedSourceTest {
- @Test
- public void testFailJobWhenPreviousStateExistsButDoesNotHaveSnapshot() {
- try {
- DummyFileBasedSource source = new DummyFileBasedSource();
-
- WorkUnitState workUnitState = new WorkUnitState();
- workUnitState.setId("priorState");
- List<WorkUnitState> workUnitStates =
Lists.newArrayList(workUnitState);
-
- State state = new State();
- state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
Extract.TableType.SNAPSHOT_ONLY.toString());
-
state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED,
true);
-
- SourceState sourceState = new SourceState(state, workUnitStates);
-
- source.getWorkunits(sourceState);
- Assert.fail("Expected RuntimeException, but no exceptions were
thrown.");
- } catch (RuntimeException e) {
- Assert.assertEquals("No 'source.filebased.fs.snapshot' found on
state of prior job",
- e.getMessage());
- }
+
+ private static final String SOURCE_LINEAGE_KEY =
"gobblin.event.lineage.source";
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker;
+ SharedResourcesBroker<GobblinScopeTypes> jobBroker;
+ Path sourceDir;
+
+ @BeforeClass
+ public void setup() {
+ instanceBroker = SharedResourcesBrokerFactory
+ .createDefaultTopLevelBroker(ConfigFactory.empty(),
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ jobBroker = instanceBroker
+ .newSubscopedBuilder(new JobScopeInstance("LineageEventTest",
String.valueOf(System.currentTimeMillis())))
+ .build();
+ sourceDir = new Path(getClass().getResource("/source").toString());
+ }
+
+ @Test
+ public void testFailJobWhenPreviousStateExistsButDoesNotHaveSnapshot() {
+ try {
+ DummyFileBasedSource source = new DummyFileBasedSource();
+
+ WorkUnitState workUnitState = new WorkUnitState();
+ workUnitState.setId("priorState");
+ List<WorkUnitState> workUnitStates = Lists.newArrayList(workUnitState);
+
+ State state = new State();
+ state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
Extract.TableType.SNAPSHOT_ONLY.toString());
+
state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_PRIOR_SNAPSHOT_REQUIRED,
true);
+
+ SourceState sourceState = new SourceState(state, workUnitStates);
+
+ source.getWorkunits(sourceState);
+ Assert.fail("Expected RuntimeException, but no exceptions were thrown.");
+ } catch (RuntimeException e) {
+ Assert.assertEquals("No 'source.filebased.fs.snapshot' found on state of
prior job", e.getMessage());
+ }
+ }
+
+ @Test
+ void numberOfWorkUnits()
+ throws IOException {
+ SourceState sourceState = new SourceState();
+ sourceState.setBroker(jobBroker);
+ DatePartitionedJsonFileSource source = new DatePartitionedJsonFileSource();
+ initState(sourceState);
+ List<WorkUnit> workUnits = source.getWorkunits(sourceState);
+ Assert.assertEquals(3, workUnits.size());
+ }
+
+ @Test
+ public void testSourceLineage() {
+ String dataset =
Path.getPathWithoutSchemeAndAuthority(sourceDir).toString();
+
+ SourceState sourceState = new SourceState();
+ sourceState.setBroker(jobBroker);
+ initState(sourceState);
+
+ // Avro file based source
+ AvroFileSource fileSource = new AvroFileSource();
+ List<WorkUnit> workUnits = fileSource.getWorkunits(sourceState);
+ DatasetDescriptor datasetDescriptor = new DatasetDescriptor("hdfs",
dataset);
+ for (WorkUnit workUnit : workUnits) {
+ Assert.assertEquals(workUnit.getProp(SOURCE_LINEAGE_KEY),
Descriptor.toJson(datasetDescriptor));
+ }
+
+ // Partitioned file based source
+ // Test platform configuration
+ sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_PLATFORM,
DatasetConstants.PLATFORM_FILE);
+ DatePartitionedJsonFileSource partitionedFileSource = new
DatePartitionedJsonFileSource();
+ workUnits = partitionedFileSource.getWorkunits(sourceState);
+ datasetDescriptor = new DatasetDescriptor("file", dataset);
+
+ Set<String> partitions = Sets.newHashSet("2017-12", "2018-01");
+ for (WorkUnit workUnit : workUnits) {
+ if (workUnit instanceof MultiWorkUnit) {
+ DatasetDescriptor finalDatasetDescriptor = datasetDescriptor;
+ ((MultiWorkUnit) workUnit).getWorkUnits().forEach( wu ->
verifyPartitionSourceLineage(wu, partitions,
+ finalDatasetDescriptor));
+ } else {
+ verifyPartitionSourceLineage(workUnit, partitions, datasetDescriptor);
+ }
+ }
+ }
+
+ private void verifyPartitionSourceLineage(WorkUnit wu, Set<String>
partitions, DatasetDescriptor datasetDescriptor) {
+ PartitionDescriptor descriptor = (PartitionDescriptor)
Descriptor.fromJson(wu.getProp(SOURCE_LINEAGE_KEY));
+ Assert.assertTrue(partitions.contains(descriptor.getName()));
+ Assert.assertEquals(descriptor.getDataset(), datasetDescriptor);
+ }
+
+ private void initState(State state) {
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY,
sourceDir.toString());
+
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN,
"yyyy-MM");
+
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE,
"2017-11");
+ state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, "snapshot_only");
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true");
+ state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json");
+ }
+
+ @AfterClass
+ public void cleanup()
+ throws IOException {
+ if (jobBroker != null) {
+ jobBroker.close();
+ }
+
+ if (instanceBroker != null) {
+ instanceBroker.close();
+ }
+ }
+
+ private static class DummyFileBasedSource extends FileBasedSource<String,
String> {
+ @Override
+ public void initFileSystemHelper(State state)
+ throws FileBasedHelperException {
+ }
+
+ @Override
+ protected List<WorkUnit> getPreviousWorkUnitsForRetry(SourceState state) {
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public List<String> getcurrentFsSnapshot(State state) {
+ return Lists.newArrayList("SnapshotEntry");
+ }
+
+ @Override
+ public Extractor<String, String> getExtractor(WorkUnitState state)
+ throws IOException {
+ return new DummyExtractor();
+ }
+ }
+
+ private static class DummyExtractor implements Extractor<String, String> {
+ @Override
+ public String getSchema() {
+ return "";
}
- @Test void numberOfWorkUnits() throws IOException {
- SourceState sourceState = new SourceState();
- DatePartitionedJsonFileSource source = new
DatePartitionedJsonFileSource();
- initState(sourceState);
- List<WorkUnit> workUnits = source.getWorkunits(sourceState);
- Assert.assertEquals(3, workUnits.size());
+ @Override
+ public String readRecord(String reuse)
+ throws DataRecordException, IOException {
+ return null;
}
- private void initState(State state) {
- state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY,
- new Path(getClass().getResource("/source").toString()).toString());
-
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN,
"yyyy-MM");
-
state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE,
"2017-11");
- state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY,
"snapshot_only");
- state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
- state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true");
- state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json");
+ @Override
+ public long getExpectedRecordCount() {
+ return 0;
}
- private static class DummyFileBasedSource extends FileBasedSource<String,
String> {
- @Override
- public void initFileSystemHelper(State state) throws
FileBasedHelperException {
- }
-
- @Override
- protected List<WorkUnit> getPreviousWorkUnitsForRetry(SourceState
state) {
- return Lists.newArrayList();
- }
-
- @Override
- public List<String> getcurrentFsSnapshot(State state) {
- return Lists.newArrayList("SnapshotEntry");
- }
-
- @Override
- public Extractor<String, String> getExtractor(WorkUnitState state)
throws IOException {
- return new DummyExtractor();
- }
+ @Override
+ public long getHighWatermark() {
+ return 0;
}
- private static class DummyExtractor implements Extractor<String, String> {
- @Override
- public String getSchema() {
- return "";
- }
-
- @Override
- public String readRecord(String reuse) throws DataRecordException,
IOException {
- return null;
- }
-
- @Override
- public long getExpectedRecordCount() {
- return 0;
- }
-
- @Override
- public long getHighWatermark() {
- return 0;
- }
-
- @Override
- public void close() throws IOException {}
+ @Override
+ public void close()
+ throws IOException {
}
+ }
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 3e65105..ed8903d 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -24,6 +24,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -158,8 +160,15 @@ public final class LineageInfo {
resolvedDescriptors.add(resolvedDescriptor);
}
- state.setProp(getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION),
- Descriptor.toJson(resolvedDescriptors));
+ String destinationKey = getKey(BRANCH, branchId,
LineageEventBuilder.DESTINATION);
+ String currentDestinations = state.getProp(destinationKey);
+ List<Descriptor> allDescriptors = Lists.newArrayList();
+ if (StringUtils.isNotEmpty(currentDestinations)) {
+ allDescriptors = Descriptor.fromJsonList(currentDestinations);
+ }
+ allDescriptors.addAll(resolvedDescriptors);
+
+ state.setProp(destinationKey, Descriptor.toJson(allDescriptors));
}
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 5fd7952..e792a7f 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -138,6 +138,36 @@ public class LineageEventTest {
Assert.assertEquals(LineageEventBuilder.fromEvent(trackingEvent), event);
}
+ @Test
+ public void testMultiPuts() {
+ final String topic = "testTopic";
+ final String kafka = "kafka";
+ final String hdfs = "hdfs";
+ final String path = "/data/tracking/PageViewEvent";
+ final String partitionName = "hourly/2018/08/15/15";
+
+ State state = new State();
+ LineageInfo lineageInfo = getLineageInfo();
+ DatasetDescriptor source = new DatasetDescriptor(kafka, topic);
+ lineageInfo.setSource(source, state);
+ DatasetDescriptor destinationDataset = new DatasetDescriptor(hdfs, path);
+ PartitionDescriptor destination = new PartitionDescriptor(partitionName,
destinationDataset);
+ lineageInfo.putDestination(Lists.newArrayList(destination), 0, state);
+
+ // Put another destination
+ DatasetDescriptor destinationDataset2 = new DatasetDescriptor(kafka,
"nextTopic");
+ lineageInfo.putDestination(Lists.newArrayList(destinationDataset2), 0,
state);
+
+ Map<String, Set<LineageEventBuilder>> eventsMap = LineageInfo.load(state);
+ Assert.assertEquals(eventsMap.size(), 1);
+
+ Set<LineageEventBuilder> events = eventsMap.get("0");
+ Assert.assertEquals(events.size(), 2);
+
+ verifyOne(events, topic, source, destination);
+ verifyOne(events, topic, source, destinationDataset2);
+ }
+
private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder>
events, int branchId, String destinationPlatform) {
for (LineageEventBuilder event : events) {
DatasetDescriptor descriptor = (DatasetDescriptor)
event.getDestination();
@@ -175,4 +205,15 @@ public class LineageEventTest {
private <T> T first(Collection<T> collection) {
return collection.iterator().next();
}
+
+ private void verifyOne(Collection<LineageEventBuilder> collection, String
name, Descriptor source, Descriptor destination) {
+ for (LineageEventBuilder event : collection) {
+ if (event.getDestination().equals(destination)) {
+ verify(event, name, source, destination);
+ return;
+ }
+ }
+
+ Assert.fail("Could not find a matching lineage with destination: " +
destination);
+ }
}