Repository: incubator-gobblin Updated Branches: refs/heads/master 1447f2d3b -> b39bf8cab
[GOBBLIN-464] Enhance LoopingDatasetFinderSource to support global watermark and per-dataset watermark. Closes #2336 from sv2000/loopingwm Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b39bf8ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b39bf8ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b39bf8ca Branch: refs/heads/master Commit: b39bf8cabcabe7864719f4121ccffa09d9f6c08a Parents: 1447f2d Author: suvasude <[email protected]> Authored: Sat Apr 21 13:54:27 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Sat Apr 21 13:54:27 2018 -0700 ---------------------------------------------------------------------- .../CombinedWorkUnitAndDatasetState.java | 34 ++ ...mbinedWorkUnitAndDatasetStateFunctional.java | 33 ++ .../configuration/ConfigurationKeys.java | 1 + .../gobblin/configuration/SourceState.java | 113 +++- .../source/LoopingDatasetFinderSource.java | 139 +++-- .../source/LoopingDatasetFinderSourceTest.java | 546 ++++++++++++++++--- .../gobblin/runtime/ZkDatasetStateStore.java | 2 +- ...ombinedWorkUnitAndDatasetStateGenerator.java | 71 +++ .../gobblin/runtime/FsDatasetStateStore.java | 44 +- .../org/apache/gobblin/runtime/JobContext.java | 33 +- .../org/apache/gobblin/runtime/JobState.java | 11 +- .../gobblin/runtime/MysqlDatasetStateStore.java | 2 +- 12 files changed, 867 insertions(+), 162 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java new file mode 100644 index 0000000..9b2743c --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.configuration; + +import java.util.List; +import java.util.Map; + +import lombok.AllArgsConstructor; +import lombok.Getter; + + +/** + * A class that encapsulates {@link WorkUnitState} and DatasetStates. + */ +@AllArgsConstructor +@Getter +public class CombinedWorkUnitAndDatasetState { + private List<WorkUnitState> previousWorkUnitStates; + private Map<String, ? extends SourceState> previousDatasetStatesByUrns; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java new file mode 100644 index 0000000..9543e00 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/CombinedWorkUnitAndDatasetStateFunctional.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.configuration; + +/** + * A {@link FunctionalInterface} to return {@link CombinedWorkUnitAndDatasetState}. + */ +@FunctionalInterface +public interface CombinedWorkUnitAndDatasetStateFunctional { + /** + * + * @param datasetUrn the dataset urn + * @return an instance {@link CombinedWorkUnitAndDatasetState}. If datasetUrn is null or empty, then return latest {@link WorkUnitState}s and DatasetStates of all datasetUrns in the state store; else return the previous {@link WorkUnitState} and the DatasetState of the specified datasetUrn. + * @throws Exception the exception + */ + public CombinedWorkUnitAndDatasetState getCombinedWorkUnitAndDatasetState(String datasetUrn) + throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 8e2d225..a0eeca3 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -246,6 +246,7 @@ public class ConfigurationKeys { */ // This property is used to specify the URN of a dataset a job or WorkUnit extracts data for public static final String DATASET_URN_KEY = "dataset.urn"; + public static final String GLOBAL_WATERMARK_DATASET_URN="__globalDatasetWatermark"; public static final String DEFAULT_DATASET_URN = ""; /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java index fd772ea..9c08a5d 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java @@ -20,6 +20,7 @@ package org.apache.gobblin.configuration; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -64,19 +65,36 @@ public class SourceState extends State { private static final DateTimeFormatter DTF = DateTimeFormat.forPattern("yyyyMMddHHmmss").withLocale(Locale.US).withZone(DateTimeZone.UTC); + private Map<String, SourceState> previousDatasetStatesByUrns; + + private List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList(); + @Getter - private final Map<String, SourceState> previousDatasetStatesByUrns; + @Setter + private SharedResourcesBroker<GobblinScopeTypes> broker; @Getter - private final List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList(); + @Setter + private CombinedWorkUnitAndDatasetStateFunctional workUnitAndDatasetStateFunctional; - @Getter @Setter - private SharedResourcesBroker<GobblinScopeTypes> broker; + private boolean areWorkUnitStatesMaterialized; /** * Default constructor. */ public SourceState() { + this.previousWorkUnitStates = new ArrayList<>(); + this.previousDatasetStatesByUrns = ImmutableMap.of(); + } + + /** + * Constructor. + * + * @param properties job configuration properties + */ + public SourceState(State properties) { + super.addAll(properties); + this.previousWorkUnitStates = new ArrayList<>(); this.previousDatasetStatesByUrns = ImmutableMap.of(); } @@ -127,7 +145,8 @@ public class SourceState extends State { /** * Get the state (in the form of a {@link SourceState}) of a dataset identified by a dataset URN - * of the previous job run. + * of the previous job run. Useful when dataset state store is enabled and we want to load the latest + * state of a global watermark dataset. * * @param datasetUrn the dataset URN * @return the dataset state (in the form of a {@link SourceState}) of the previous job run @@ -141,6 +160,54 @@ public class SourceState extends State { } /** + * + * @return a {@link Map} from dataset URNs (as being specified by {@link ConfigurationKeys#DATASET_URN_KEY} + * to the {@link SourceState} with the dataset URNs. The map is materialized upon invocation of the method + * by the source. Subsequent calls to this method will return the previously materialized map. + * <p> + * {@link SourceState}s that do not have {@link ConfigurationKeys#DATASET_URN_KEY} set will be added + * to the dataset state belonging to {@link ConfigurationKeys#DEFAULT_DATASET_URN}. + * </p> + * + * @return a {@link Map} from dataset URNs to the {@link SourceState} with the dataset URNs + */ + public Map<String, SourceState> getPreviousDatasetStatesByUrns() { + if (this.workUnitAndDatasetStateFunctional != null) { + materializeWorkUnitAndDatasetStates(null); + } + return this.previousDatasetStatesByUrns; + } + + /** + * Get a {@link List} of previous {@link WorkUnitState}s. The list is lazily materialized upon invocation of the + * method by the {@link org.apache.gobblin.source.Source}. Subsequent calls to this method will return the previously + * materialized map. + */ + public List<WorkUnitState> getPreviousWorkUnitStates() { + if (this.workUnitAndDatasetStateFunctional != null) { + materializeWorkUnitAndDatasetStates(null); + } + return this.previousWorkUnitStates; + } + + /** + * Get a {@link List} of previous {@link WorkUnitState}s for a given datasetUrn. + * @param datasetUrn + * @return {@link List} of {@link WorkUnitState}s. + */ + public List<WorkUnitState> getPreviousWorkUnitStates(String datasetUrn) { + if (this.workUnitAndDatasetStateFunctional != null) { + try { + CombinedWorkUnitAndDatasetState state = this.workUnitAndDatasetStateFunctional.getCombinedWorkUnitAndDatasetState(datasetUrn); + return state.getPreviousWorkUnitStates(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return this.previousWorkUnitStates; + } + + /** * Get a {@link Map} from dataset URNs (as being specified by {@link ConfigurationKeys#DATASET_URN_KEY} * to the {@link WorkUnitState} with the dataset URNs. * @@ -153,12 +220,14 @@ public class SourceState extends State { */ public Map<String, Iterable<WorkUnitState>> getPreviousWorkUnitStatesByDatasetUrns() { Map<String, Iterable<WorkUnitState>> previousWorkUnitStatesByDatasetUrns = Maps.newHashMap(); - + if (this.workUnitAndDatasetStateFunctional != null) { + materializeWorkUnitAndDatasetStates(null); + } for (WorkUnitState workUnitState : this.previousWorkUnitStates) { String datasetUrn = workUnitState.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); if (!previousWorkUnitStatesByDatasetUrns.containsKey(datasetUrn)) { - previousWorkUnitStatesByDatasetUrns.put(datasetUrn, Lists.<WorkUnitState> newArrayList()); + previousWorkUnitStatesByDatasetUrns.put(datasetUrn, Lists.newArrayList()); } ((List<WorkUnitState>) previousWorkUnitStatesByDatasetUrns.get(datasetUrn)).add(workUnitState); } @@ -167,6 +236,25 @@ public class SourceState extends State { } /** + * A thread-safe method for materializing previous {@link WorkUnitState}s and DatasetStates. + * @param datasetUrn + */ + private synchronized void materializeWorkUnitAndDatasetStates(String datasetUrn) { + if (!this.areWorkUnitStatesMaterialized) { + try { + CombinedWorkUnitAndDatasetState workUnitAndDatasetState = + this.workUnitAndDatasetStateFunctional.getCombinedWorkUnitAndDatasetState(datasetUrn); + this.previousWorkUnitStates = workUnitAndDatasetState.getPreviousWorkUnitStates(); + this.previousDatasetStatesByUrns = + (Map<String, SourceState>) workUnitAndDatasetState.getPreviousDatasetStatesByUrns(); + this.areWorkUnitStatesMaterialized = true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** * Create a new properly populated {@link Extract} instance. * * <p> @@ -179,7 +267,7 @@ public class SourceState extends State { * @return a new unique {@link Extract} instance * * @Deprecated Use {@link org.apache.gobblin.source.extractor.extract.AbstractSource#createExtract( - * org.apache.gobblin.source.workunit.Extract.TableType, String, String)} + *org.apache.gobblin.source.workunit.Extract.TableType, String, String)} */ @Deprecated public synchronized Extract createExtract(Extract.TableType type, String namespace, String table) { @@ -211,7 +299,8 @@ public class SourceState extends State { } @Override - public void write(DataOutput out) throws IOException { + public void write(DataOutput out) + throws IOException { out.writeInt(this.previousWorkUnitStates.size()); for (WorkUnitState state : this.previousWorkUnitStates) { state.write(out); @@ -220,7 +309,8 @@ public class SourceState extends State { } @Override - public void readFields(DataInput in) throws IOException { + public void readFields(DataInput in) + throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { WorkUnitState workUnitState = new WorkUnitState(); @@ -261,7 +351,8 @@ public class SourceState extends State { } @Override - public void readFields(DataInput in) throws IOException { + public void readFields(DataInput in) + throws IOException { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java index 4ca0dcb..355f463 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.gobblin.data.management.source; import java.io.IOException; @@ -25,6 +24,15 @@ import java.util.Spliterator; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.Dataset; @@ -36,16 +44,11 @@ import org.apache.gobblin.runtime.task.NoopTask; import org.apache.gobblin.source.workunit.BasicWorkUnitStream; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.WorkUnitStream; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.PeekingIterator; +import org.apache.gobblin.util.ConfigUtils; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; - /** * A source that processes datasets generated by a {@link org.apache.gobblin.dataset.DatasetsFinder}, processing a few of * them each run, and continuing from where it left off in the next run. When it is done processing all the datasets, it @@ -55,16 +58,19 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSource<S, D> { - - public static final String MAX_WORK_UNITS_PER_RUN_KEY = "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun"; + public static final String MAX_WORK_UNITS_PER_RUN_KEY = + "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun"; public static final int MAX_WORK_UNITS_PER_RUN = 10; + public static final String DATASET_PARTITION_DELIMITER = "@"; - private static final String DATASET_URN = "gobblin.source.loopingDatasetFinderSource.datasetUrn"; - private static final String PARTITION_URN = "gobblin.source.loopingDatasetFinderSource.partitionUrn"; - private static final String WORK_UNIT_ORDINAL = "gobblin.source.loopingDatasetFinderSource.workUnitOrdinal"; + protected static final String DATASET_URN = "gobblin.source.loopingDatasetFinderSource.datasetUrn"; + protected static final String PARTITION_URN = "gobblin.source.loopingDatasetFinderSource.partitionUrn"; protected static final String END_OF_DATASETS_KEY = "gobblin.source.loopingDatasetFinderSource.endOfDatasets"; + protected static final String GLOBAL_WATERMARK_DATASET_KEY = + "gobblin.source.loopingDatasetFinderSource.globalWatermarkDataset"; private final URNLexicographicalComparator lexicographicalComparator = new URNLexicographicalComparator(); + protected boolean isDatasetStateStoreEnabled; /** * @param drilldownIntoPartitions if set to true, will process each partition of a {@link PartitionableDataset} as a @@ -81,19 +87,26 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour @Override public WorkUnitStream getWorkunitStream(SourceState state) { + return this.getWorkunitStream(state,false); + } + + public WorkUnitStream getWorkunitStream(SourceState state, boolean isDatasetStateStoreEnabled) { + this.isDatasetStateStoreEnabled = isDatasetStateStoreEnabled; try { int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN); - - List<WorkUnitState> previousWorkUnitStates = state.getPreviousWorkUnitStates(); - Optional<WorkUnitState> maxWorkUnit; - try { - maxWorkUnit = previousWorkUnitStates.stream().reduce((wu1, wu2) -> { - int wu1Ordinal = wu1.getPropAsInt(WORK_UNIT_ORDINAL); - int wu2Ordinal = wu2.getPropAsInt(WORK_UNIT_ORDINAL); - return wu1Ordinal > wu2Ordinal ? wu1 : wu2; - }); - } catch (NumberFormatException nfe) { - throw new RuntimeException("Work units in state store are corrupted! Missing or malformed " + WORK_UNIT_ORDINAL); + Preconditions.checkArgument(maxWorkUnits > 0, "Max work units must be greater than 0!"); + Config config = ConfigUtils.propertiesToConfig(state.getProperties()); + + List<WorkUnitState> previousWorkUnitStates = (this.isDatasetStateStoreEnabled) ? state + .getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN) + : state.getPreviousWorkUnitStates(); + + Optional<WorkUnitState> maxWorkUnit = Optional.empty(); + for (WorkUnitState workUnitState : previousWorkUnitStates) { + if (workUnitState.getPropAsBoolean(GLOBAL_WATERMARK_DATASET_KEY, false)) { + maxWorkUnit = Optional.of(workUnitState); + break; + } } String previousDatasetUrnWatermark = null; @@ -105,11 +118,13 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour IterableDatasetFinder datasetsFinder = createDatasetsFinder(state); - Stream<Dataset> datasetStream = datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator); + Stream<Dataset> datasetStream = + datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator); datasetStream = sortStreamLexicographically(datasetStream); - return new BasicWorkUnitStream.Builder(new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark, - previousPartitionUrnWatermark, maxWorkUnits)).setFiniteStream(true).build(); + return new BasicWorkUnitStream.Builder( + new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark, previousPartitionUrnWatermark, + maxWorkUnits, config)).setFiniteStream(true).build(); } catch (IOException ioe) { throw new RuntimeException(ioe); } @@ -125,13 +140,16 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator; private int generatedWorkUnits = 0; + private Dataset previousDataset; + private PartitionableDataset.DatasetPartition previousPartition; public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark, - String previousPartitionUrnWatermark, int maxWorkUnits) { + String previousPartitionUrnWatermark, int maxWorkUnits, Config config) + throws IOException { this.maxWorkUnits = maxWorkUnits; this.baseIterator = baseIterator; - - Dataset equalDataset = advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), previousDatasetUrnWatermark); + Dataset equalDataset = + advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), previousDatasetUrnWatermark); if (drilldownIntoPartitions && equalDataset != null && equalDataset instanceof PartitionableDataset) { this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) equalDataset); @@ -145,7 +163,8 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour * Advance an iterator until the next value is larger than the reference. * @return the last value polled if it is equal to reference, or null otherwise. */ - @Nullable private <T extends URNIdentified> T advanceUntilLargerThan(PeekingIterator<T> it, String reference) { + @Nullable + private <T extends URNIdentified> T advanceUntilLargerThan(PeekingIterator<T> it, String reference) { if (reference == null) { return null; } @@ -160,7 +179,8 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour private Iterator<PartitionableDataset.DatasetPartition> getPartitionIterator(PartitionableDataset dataset) { try { return this.currentPartitionIterator = sortStreamLexicographically( - dataset.getPartitions(Spliterator.SORTED, LoopingDatasetFinderSource.this.lexicographicalComparator)).iterator(); + dataset.getPartitions(Spliterator.SORTED, LoopingDatasetFinderSource.this.lexicographicalComparator)) + .iterator(); } catch (IOException ioe) { log.error("Failed to get partitions for dataset " + dataset.getUrn()); return Iterators.emptyIterator(); @@ -169,7 +189,15 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour @Override protected WorkUnit computeNext() { - if (this.generatedWorkUnits >= this.maxWorkUnits) { + if (this.generatedWorkUnits == this.maxWorkUnits) { + /** + * Add a special noop workunit to the end of the stream. This workunit contains the Dataset/Partition + * URN of the "last" dataset/partition (in lexicographic order). This is useful to + * efficiently determine the next dataset/partition to process in the subsequent run. + */ + this.generatedWorkUnits++; + return generateNoopWorkUnit(); + } else if (this.generatedWorkUnits > this.maxWorkUnits) { return endOfData(); } @@ -177,8 +205,11 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) { PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next(); WorkUnit workUnit = workUnitForDatasetPartition(partition); - addDatasetInfoToWorkUnit(workUnit, partition.getDataset(), this.generatedWorkUnits++); + addDatasetInfoToWorkUnit(workUnit, partition.getDataset()); addPartitionInfoToWorkUnit(workUnit, partition); + this.previousDataset = partition.getDataset(); + this.previousPartition = partition; + this.generatedWorkUnits++; return workUnit; } @@ -187,27 +218,44 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset); } else { WorkUnit workUnit = workUnitForDataset(dataset); - addDatasetInfoToWorkUnit(workUnit, dataset, this.generatedWorkUnits++); + addDatasetInfoToWorkUnit(workUnit, dataset); + this.previousDataset = dataset; + this.generatedWorkUnits++; return workUnit; } } - - WorkUnit workUnit = NoopTask.noopWorkunit(); - workUnit.setProp(WORK_UNIT_ORDINAL, this.generatedWorkUnits); - + WorkUnit workUnit = generateNoopWorkUnit(); this.generatedWorkUnits = Integer.MAX_VALUE; - workUnit.setProp(END_OF_DATASETS_KEY, true); return workUnit; } - private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset, int workUnitOrdinal) { - workUnit.setProp(DATASET_URN, dataset.getUrn()); - workUnit.setProp(WORK_UNIT_ORDINAL, workUnitOrdinal); + private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset) { + if (isDatasetStateStoreEnabled) { + workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn()); + } } private void addPartitionInfoToWorkUnit(WorkUnit workUnit, PartitionableDataset.DatasetPartition partition) { - workUnit.setProp(PARTITION_URN, partition.getUrn()); + if (isDatasetStateStoreEnabled) { + workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, + Joiner.on(DATASET_PARTITION_DELIMITER).join(partition.getDataset().getUrn(), partition.getUrn())); + } + } + + private WorkUnit generateNoopWorkUnit() { + WorkUnit workUnit = NoopTask.noopWorkunit(); + workUnit.setProp(GLOBAL_WATERMARK_DATASET_KEY, true); + if (previousDataset != null) { + workUnit.setProp(DATASET_URN, previousDataset.getUrn()); + } + if (drilldownIntoPartitions && this.previousPartition != null) { + workUnit.setProp(PARTITION_URN, previousPartition.getUrn()); + } + if (isDatasetStateStoreEnabled) { + workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + } + return workUnit; } } @@ -216,11 +264,10 @@ public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSour */ private <T extends URNIdentified> Stream<T> sortStreamLexicographically(Stream<T> inputStream) { Spliterator<T> spliterator = inputStream.spliterator(); - if (spliterator.hasCharacteristics(Spliterator.SORTED) && - spliterator.getComparator().equals(this.lexicographicalComparator)) { + if (spliterator.hasCharacteristics(Spliterator.SORTED) && spliterator.getComparator() + .equals(this.lexicographicalComparator)) { return StreamSupport.stream(spliterator, false); } return StreamSupport.stream(spliterator, false).sorted(this.lexicographicalComparator); } - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java index 76fe172..41745f1 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java @@ -14,14 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.gobblin.data.management.source; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.Stream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.source.extractor.WatermarkInterval; +import org.apache.hadoop.conf.Configuration; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.Dataset; @@ -31,27 +45,48 @@ import org.apache.gobblin.dataset.test.SimpleDatasetForTesting; import org.apache.gobblin.dataset.test.SimpleDatasetPartitionForTesting; import org.apache.gobblin.dataset.test.SimplePartitionableDatasetForTesting; import org.apache.gobblin.dataset.test.StaticDatasetsFinderForTesting; +import org.apache.gobblin.runtime.FsDatasetStateStore; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.WorkUnitStream; -import org.mockito.Mockito; -import org.testng.Assert; -import org.testng.annotations.Test; -import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class LoopingDatasetFinderSourceTest { + private static final String TEST_JOB_NAME_1 = "TestJob1"; + private static final String TEST_JOB_NAME_2 = "TestJob2"; + private static final String TEST_JOB_ID = "TestJob11"; + private static final String TEST_TASK_ID_PREFIX = "TestTask-"; + private static final String TEST_STATE_STORE_ROOT_DIR = "/tmp/LoopingSourceTest"; + + private FsDatasetStateStore fsDatasetStateStore; + private long startTime = System.currentTimeMillis(); + + @BeforeClass + public void setUp() + throws IOException { + this.fsDatasetStateStore = new FsDatasetStateStore(ConfigurationKeys.LOCAL_FS_URI, TEST_STATE_STORE_ROOT_DIR); + + // clear data that may have been left behind by a prior test run + this.fsDatasetStateStore.delete(TEST_JOB_NAME_1); + this.fsDatasetStateStore.delete(TEST_JOB_NAME_2); + } @Test public void testNonDrilldown() { Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); - Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"))); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", + Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"))); Dataset dataset3 = new SimpleDatasetForTesting("dataset3"); Dataset dataset4 = new SimpleDatasetForTesting("dataset4"); Dataset dataset5 = new SimpleDatasetForTesting("dataset5"); - IterableDatasetFinder finder = new StaticDatasetsFinderForTesting( - Lists.newArrayList(dataset5, dataset4, dataset3, dataset2, dataset1)); + IterableDatasetFinder finder = + new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset5, dataset4, dataset3, dataset2, dataset1)); MySource mySource = new MySource(false, finder); @@ -61,13 +96,8 @@ public class LoopingDatasetFinderSourceTest { WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState); List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); - Assert.assertEquals(workUnits.size(), 3); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); - Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); - Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.size(), 4); + verifyWorkUnitState(workUnits, "dataset3", null, false, false); // Second run should continue where it left off List<WorkUnitState> workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); @@ -78,11 +108,7 @@ public class LoopingDatasetFinderSourceTest { workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); Assert.assertEquals(workUnits.size(), 3); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset4"); - Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset5"); - Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertTrue(workUnits.get(2).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + verifyWorkUnitState(workUnits, "dataset5", null, true, false); // Loop around workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); @@ -91,28 +117,22 @@ public class LoopingDatasetFinderSourceTest { workUnitStream = mySource.getWorkunitStream(sourceStateSpy); workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); - Assert.assertEquals(workUnits.size(), 3); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); - Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); - Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.size(), 4); + verifyWorkUnitState(workUnits, "dataset3", null, false, false); } @Test public void testDrilldown() { // Create three datasets, two of them partitioned Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); - Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", - Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), - new SimpleDatasetPartitionForTesting("p2"), new SimpleDatasetPartitionForTesting("p3"))); - Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3", - Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), - new SimpleDatasetPartitionForTesting("p2"), new SimpleDatasetPartitionForTesting("p3"))); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists + .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"), + new SimpleDatasetPartitionForTesting("p3"))); + Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3", Lists + .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"), + new SimpleDatasetPartitionForTesting("p3"))); - IterableDatasetFinder finder = new StaticDatasetsFinderForTesting( - Lists.newArrayList(dataset3, dataset2, dataset1)); + IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset3, dataset2, dataset1)); MySource mySource = new MySource(true, finder); @@ -124,13 +144,8 @@ public class LoopingDatasetFinderSourceTest { WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState); List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); - Assert.assertEquals(workUnits.size(), 3); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); - Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1"); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2"); + Assert.assertEquals(workUnits.size(), 4); + verifyWorkUnitState(workUnits, "dataset2", "p2", false, false); // Second run should continue where it left off List<WorkUnitState> workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); @@ -140,13 +155,8 @@ public class LoopingDatasetFinderSourceTest { workUnitStream = mySource.getWorkunitStream(sourceStateSpy); workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); - Assert.assertEquals(workUnits.size(), 3); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN), "p3"); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1"); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2"); + Assert.assertEquals(workUnits.size(), 4); + verifyWorkUnitState(workUnits, "dataset3", "p2", false, false); // third run, continue from where it left off workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); @@ -156,9 +166,7 @@ public class LoopingDatasetFinderSourceTest { workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); Assert.assertEquals(workUnits.size(), 2); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN), "p3"); - Assert.assertTrue(workUnits.get(1).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + verifyWorkUnitState(workUnits, "dataset3", "p3", true, false); // fourth run, finished all work units, loop around workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); @@ -167,40 +175,438 @@ public class LoopingDatasetFinderSourceTest { workUnitStream = mySource.getWorkunitStream(sourceStateSpy); workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + Assert.assertEquals(workUnits.size(), 4); + verifyWorkUnitState(workUnits, "dataset2", "p2", false, false); + } + + @Test + public void testNonDrilldownDatasetState() + throws IOException { + Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", + Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"))); + Dataset dataset3 = new SimpleDatasetForTesting("dataset3"); + Dataset dataset4 = new SimpleDatasetForTesting("dataset4"); + Dataset dataset5 = new SimpleDatasetForTesting("dataset5"); + + IterableDatasetFinder finder = + new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset5, dataset4, dataset3, dataset2, dataset1)); + + MySource mySource = new MySource(false, finder, fsDatasetStateStore, TEST_JOB_NAME_1); + + SourceState sourceState = new SourceState(); + sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 3); + sourceState.setProp(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, TEST_STATE_STORE_ROOT_DIR); + sourceState.setProp(ConfigurationKeys.JOB_NAME_KEY, TEST_JOB_NAME_1); + WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState, true); + List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 4); + List<LongWatermark> watermarks1 = new ArrayList<>(); + List<Dataset> datasets1 = new ArrayList<>(); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets1.add(dataset1); + + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2"); + Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class)); + datasets1.add(dataset2); + + Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3"); + Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class)); + datasets1.add(dataset3); + + Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY), + ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + + Dataset globalWmDataset = new SimpleDatasetForTesting(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + datasets1.add(globalWmDataset); + + verifyWorkUnitState(workUnits,"dataset3", null, false, true); + persistDatasetState(datasets1, watermarks1, TEST_JOB_NAME_1); + testDatasetStates(datasets1, watermarks1, TEST_JOB_NAME_1); + + // Second run should continue where it left off + List<LongWatermark> watermarks2 = new ArrayList<>(); + List<Dataset> datasets2 = new ArrayList<>(); + + int workUnitSize = workUnits.size(); + List<WorkUnitState> workUnitStates = + workUnits.subList(workUnitSize - 1, workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList()); + SourceState sourceStateSpy = Mockito.spy(sourceState); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + Assert.assertEquals(workUnits.size(), 3); - Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); - Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1"); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); - Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2"); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset4"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets2.add(dataset4); + + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset5"); + Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class)); + datasets2.add(dataset5); + + Assert.assertTrue(workUnits.get(2).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + datasets2.add(globalWmDataset); + + verifyWorkUnitState(workUnits,"dataset5",null,true, true); + persistDatasetState(datasets2, watermarks2, TEST_JOB_NAME_1); + testDatasetStates(datasets2, watermarks2, TEST_JOB_NAME_1); + + // Loop around + List<LongWatermark> watermarks3 = new ArrayList<>(); + List<Dataset> datasets3 = new ArrayList<>(); + + workUnitSize = workUnits.size(); + workUnitStates = workUnits.subList(workUnitSize - 1, workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList()); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 4); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), watermarks1.get(0).getValue()); + watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets3.add(dataset1); + + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2"); + Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), watermarks1.get(1).getValue()); + watermarks3.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class)); + datasets3.add(dataset2); + + Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3"); + Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), watermarks1.get(2).getValue()); + watermarks3.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class)); + datasets3.add(dataset3); + + Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY), ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + datasets3.add(globalWmDataset); + + verifyWorkUnitState(workUnits,"dataset3",null,false, true); + persistDatasetState(datasets3, watermarks3, TEST_JOB_NAME_1); + testDatasetStates(datasets3, watermarks3, TEST_JOB_NAME_1); + } + + @Test + public void testDrilldownDatasetState() + throws IOException { + // Create three datasets, two of them partitioned + Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists + .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"), + new SimpleDatasetPartitionForTesting("p3"))); + Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3", Lists + .newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"), + new SimpleDatasetPartitionForTesting("p3"))); + + IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset3, dataset2, dataset1)); + + MySource mySource = new MySource(true, finder, fsDatasetStateStore, TEST_JOB_NAME_2); + + // Limit to 3 wunits per run + SourceState sourceState = new SourceState(); + sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 3); + sourceState.setProp(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, TEST_STATE_STORE_ROOT_DIR); + sourceState.setProp(ConfigurationKeys.JOB_NAME_KEY, TEST_JOB_NAME_2); + + // first run, get three first work units + WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState,true); + List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + List<LongWatermark> watermarks1 = new ArrayList<>(); + List<Dataset> datasets1 = new ArrayList<>(); + + Assert.assertEquals(workUnits.size(), 4); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets1.add(dataset1); + + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p1"); + Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class)); + datasets1.add(new SimpleDatasetForTesting("dataset2@p1")); + + Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p2"); + Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class)); + datasets1.add(new SimpleDatasetForTesting("dataset2@p2")); + + Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY), ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.PARTITION_URN), "p2"); + Dataset globalWmDataset = new SimpleDatasetForTesting(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + datasets1.add(globalWmDataset); + + verifyWorkUnitState(workUnits,"dataset2","p2",false, true); + persistDatasetState(datasets1, watermarks1, TEST_JOB_NAME_2); + testDatasetStates(datasets1, watermarks1, TEST_JOB_NAME_2); + + // Second run should continue where it left off + int workUnitSize = workUnits.size(); + List<WorkUnitState> workUnitStates = + workUnits.subList(workUnitSize - 1, workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList()); + + List<LongWatermark> watermarks2 = new ArrayList<>(); + List<Dataset> datasets2 = new ArrayList<>(); + + SourceState sourceStateSpy = Mockito.spy(sourceState); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 4); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p3"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets2.add(new SimpleDatasetForTesting("dataset2@p3")); + + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p1"); + Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class)); + datasets2.add(new SimpleDatasetForTesting("dataset3@p1")); + + Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p2"); + Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks2.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class)); + datasets2.add(new SimpleDatasetForTesting("dataset3@p2")); + + Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY), ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.DATASET_URN), "dataset3"); + Assert.assertEquals(workUnits.get(3).getProp(LoopingDatasetFinderSource.PARTITION_URN), "p2"); + datasets2.add(globalWmDataset); + + verifyWorkUnitState(workUnits,"dataset3","p2",false, true); + persistDatasetState(datasets2, watermarks2, TEST_JOB_NAME_2); + testDatasetStates(datasets2, watermarks2, TEST_JOB_NAME_2); + + // third run, continue from where it left off + workUnitSize = workUnits.size(); + workUnitStates = + workUnits.subList(workUnitSize - 1, workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList()); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + + List<LongWatermark> watermarks3 = new ArrayList<>(); + List<Dataset> datasets3 = new ArrayList<>(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 2); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p3"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0); + watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets3.add(new SimpleDatasetForTesting("dataset3@p3")); + + Assert.assertTrue(workUnits.get(1).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + Assert.assertEquals(workUnits.get(1).getProp(LoopingDatasetFinderSource.DATASET_URN), "dataset3"); + Assert.assertEquals(workUnits.get(1).getProp(LoopingDatasetFinderSource.PARTITION_URN), "p3"); + datasets3.add(globalWmDataset); + + verifyWorkUnitState(workUnits,"dataset3","p3",true, true); + persistDatasetState(datasets3, watermarks3, TEST_JOB_NAME_2); + testDatasetStates(datasets3, watermarks3, TEST_JOB_NAME_2); + + // fourth run, finished all work units, loop around + workUnitSize = workUnits.size(); + workUnitStates = + workUnits.subList(workUnitSize - 1, workUnitSize).stream().map(WorkUnitState::new).collect(Collectors.toList()); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + + List<LongWatermark> watermarks4 = new ArrayList<>(); + List<Dataset> datasets4 = new ArrayList<>(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy,true); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 4); + Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1"); + Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), watermarks1.get(0).getValue()); + watermarks4.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class)); + datasets4.add(new SimpleDatasetForTesting("dataset1")); + + Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p1"); + Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), watermarks1.get(1).getValue()); + watermarks4.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class)); + datasets4.add(new SimpleDatasetForTesting("dataset2@p1")); + + Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p2"); + Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), watermarks1.get(2).getValue()); + watermarks4.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class)); + datasets4.add(new SimpleDatasetForTesting("dataset2@p2")); + + Assert.assertEquals(workUnits.get(3).getProp(ConfigurationKeys.DATASET_URN_KEY), ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN); + datasets4.add(new SimpleDatasetForTesting(ConfigurationKeys.GLOBAL_WATERMARK_DATASET_URN)); + + verifyWorkUnitState(workUnits,"dataset2","p2",false,true); + persistDatasetState(datasets4, watermarks4, TEST_JOB_NAME_2); + testDatasetStates(datasets4, watermarks4, TEST_JOB_NAME_2); + } + + public void verifyWorkUnitState(List<WorkUnit> workUnits, String datasetUrn, String partitionUrn, + boolean endOfDatasets, boolean isDatasetStateStoreEnabled) { + int i; + for (i = 0; i < workUnits.size() - 1; i++) { + Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.DATASET_URN)); + Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.PARTITION_URN)); + if(!isDatasetStateStoreEnabled) { + Assert.assertNull(workUnits.get(i).getProp(ConfigurationKeys.DATASET_URN_KEY)); + } + Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.GLOBAL_WATERMARK_DATASET_KEY)); + Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + } + Assert.assertEquals(workUnits.get(i).getProp(LoopingDatasetFinderSource.DATASET_URN), datasetUrn); + if (partitionUrn != null) { + Assert.assertEquals(workUnits.get(i).getProp(LoopingDatasetFinderSource.PARTITION_URN), partitionUrn); + } else { + Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.PARTITION_URN)); + } + if (!endOfDatasets) { + Assert.assertNull(workUnits.get(i).getProp(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + } else { + Assert.assertTrue(workUnits.get(i).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + } + Assert + .assertEquals(workUnits.get(i).getPropAsBoolean(LoopingDatasetFinderSource.GLOBAL_WATERMARK_DATASET_KEY), true); + } + + public void persistDatasetState(List<Dataset> datasets, List<LongWatermark> watermarks, String jobName) + throws IOException { + Preconditions.checkArgument(datasets.size() >= 2); + for (int i = 0; i < datasets.size(); i++) { + String datasetUrn = datasets.get(i).getUrn(); + JobState.DatasetState datasetState = new JobState.DatasetState(jobName, TEST_JOB_ID); + + datasetState.setDatasetUrn(datasetUrn); + datasetState.setState(JobState.RunningState.COMMITTED); + datasetState.setId(datasetUrn); + datasetState.setStartTime(this.startTime); + datasetState.setEndTime(this.startTime + 1000); + datasetState.setDuration(1000); + + TaskState taskState = new TaskState(); + taskState.setJobId(TEST_JOB_ID); + taskState.setTaskId(TEST_TASK_ID_PREFIX + i); + taskState.setId(TEST_TASK_ID_PREFIX + i); + taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + if (i < datasets.size() - 1) { + taskState.setActualHighWatermark(watermarks.get(i)); + } + datasetState.addTaskState(taskState); + + this.fsDatasetStateStore.persistDatasetState(datasetUrn, datasetState); + } + } + + private void testDatasetStates(List<Dataset> datasets, List<LongWatermark> watermarks, String jobName) + throws IOException { + Preconditions.checkArgument(datasets.size() >= 2); + for (int i = 0; i < datasets.size(); i++) { + JobState.DatasetState datasetState = + this.fsDatasetStateStore.getLatestDatasetState(jobName, datasets.get(i).getUrn()); + + Assert.assertEquals(datasetState.getDatasetUrn(), datasets.get(i).getUrn()); + Assert.assertEquals(datasetState.getJobName(), jobName); + Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID); + Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); + Assert.assertEquals(datasetState.getStartTime(), this.startTime); + Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000); + Assert.assertEquals(datasetState.getDuration(), 1000); + + Assert.assertEquals(datasetState.getCompletedTasks(), 1); + TaskState taskState = datasetState.getTaskStates().get(0); + Assert.assertEquals(taskState.getJobId(), TEST_JOB_ID); + Assert.assertEquals(taskState.getTaskId(), TEST_TASK_ID_PREFIX + i); + Assert.assertEquals(taskState.getId(), TEST_TASK_ID_PREFIX + i); + Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED); + if (i < datasets.size() - 1) { + Assert.assertEquals(taskState.getActualHighWatermark(LongWatermark.class).getValue(), + watermarks.get(i).getValue()); + } + } } public static class MySource extends LoopingDatasetFinderSource<String, String> { private final IterableDatasetFinder datasetsFinder; + private boolean isDatasetStateStoreEnabled; + private DatasetStateStore fsDatasetStateStore; + private String jobName; + private Long LAST_PROCESSED_TS = System.currentTimeMillis(); + + MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder) { + super(drilldownIntoPartitions); + this.datasetsFinder = datasetsFinder; + this.isDatasetStateStoreEnabled = false; + } - public MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder) { + MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder, + FsDatasetStateStore fsDatasetStateStore, String jobName) { super(drilldownIntoPartitions); this.datasetsFinder = datasetsFinder; + this.isDatasetStateStoreEnabled = true; + this.fsDatasetStateStore = fsDatasetStateStore; + this.jobName = jobName; } @Override - public Extractor<String, String> getExtractor(WorkUnitState state) throws IOException { + public Extractor<String, String> getExtractor(WorkUnitState state) + throws IOException { return null; } @Override protected WorkUnit workUnitForDataset(Dataset dataset) { WorkUnit workUnit = new WorkUnit(); - workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, dataset.getUrn()); + if(isDatasetStateStoreEnabled) { + JobState.DatasetState datasetState = null; + try { + datasetState = + (JobState.DatasetState) this.fsDatasetStateStore.getLatestDatasetState(this.jobName, dataset.getUrn()); + } catch (IOException e) { + throw new RuntimeException(e); + } + LongWatermark previousWatermark; + if(datasetState != null) { + previousWatermark = datasetState.getTaskStatesAsWorkUnitStates().get(0).getActualHighWatermark(LongWatermark.class); + } else { + previousWatermark = new LongWatermark(0); + } + workUnit.setWatermarkInterval(new WatermarkInterval(previousWatermark, new LongWatermark(LAST_PROCESSED_TS))); + } return workUnit; } @Override protected WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition) { WorkUnit workUnit = new WorkUnit(); - workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, partition.getDataset().getUrn()); - workUnit.setProp(DatasetFinderSourceTest.PARTITION_URN, partition.getUrn()); + if(isDatasetStateStoreEnabled) { + String datasetUrn = partition.getDataset().getUrn()+"@"+partition.getUrn(); + JobState.DatasetState datasetState = null; + try { + datasetState = + (JobState.DatasetState) this.fsDatasetStateStore.getLatestDatasetState(this.jobName, datasetUrn); + } catch (IOException e) { + throw new RuntimeException(e); + } + LongWatermark previousWatermark; + if(datasetState != null) { + previousWatermark = datasetState.getTaskStatesAsWorkUnitStates().get(0).getActualHighWatermark(LongWatermark.class); + } else { + previousWatermark = new LongWatermark(0); + } + workUnit.setWatermarkInterval(new WatermarkInterval(previousWatermark, new LongWatermark(LAST_PROCESSED_TS))); + } return workUnit; } @@ -210,9 +616,19 @@ public class LoopingDatasetFinderSourceTest { } @Override - protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException { + protected IterableDatasetFinder createDatasetsFinder(SourceState state) + throws IOException { return this.datasetsFinder; } } + @AfterClass + public void tearDown() + throws IOException { + FileSystem fs = FileSystem.getLocal(new Configuration(false)); + Path rootDir = new Path(TEST_STATE_STORE_ROOT_DIR); + if (fs.exists(rootDir)) { + fs.delete(rootDir, true); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java index db882c0..fdde5eb 100644 --- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java +++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java @@ -93,7 +93,7 @@ public class ZkDatasetStateStore extends ZkStateStore<JobState.DatasetState> public JobState.DatasetState getLatestDatasetState(String storeName, String datasetUrn) throws IOException { String alias = Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX - : datasetUrn + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; + : CharMatcher.is(':').replaceFrom(datasetUrn, '.') + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; return get(storeName, alias, datasetUrn); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java new file mode 100644 index 0000000..3e2ccfb --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CombinedWorkUnitAndDatasetStateGenerator.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.runtime; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.configuration.CombinedWorkUnitAndDatasetState; +import org.apache.gobblin.configuration.CombinedWorkUnitAndDatasetStateFunctional; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metastore.DatasetStateStore; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; + + +/** + * A class that returns previous {@link WorkUnitState}s and {@link JobState.DatasetState}s from the state store + * as a {@link CombinedWorkUnitAndDatasetState}. + */ + +public class CombinedWorkUnitAndDatasetStateGenerator implements CombinedWorkUnitAndDatasetStateFunctional { + private DatasetStateStore datasetStateStore; + private String jobName; + + /** + * Constructor. + * + * @param datasetStateStore the dataset state store + * @param jobName the job name + */ + public CombinedWorkUnitAndDatasetStateGenerator(DatasetStateStore datasetStateStore, String jobName) { + this.datasetStateStore = datasetStateStore; + this.jobName = jobName; + } + + @Override + public CombinedWorkUnitAndDatasetState getCombinedWorkUnitAndDatasetState(String datasetUrn) + throws Exception { + Map<String, JobState.DatasetState> datasetStateMap = ImmutableMap.of(); + List<WorkUnitState> workUnitStates = new ArrayList<>(); + if (Strings.isNullOrEmpty(datasetUrn)) { + datasetStateMap = this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName); + workUnitStates = JobState.workUnitStatesFromDatasetStates(datasetStateMap.values()); + } else { + JobState.DatasetState datasetState = + (JobState.DatasetState) this.datasetStateStore.getLatestDatasetState(this.jobName, datasetUrn); + if (datasetState != null) { + datasetStateMap = ImmutableMap.of(datasetUrn, datasetState); + workUnitStates = JobState.workUnitStatesFromDatasetStates(Arrays.asList(datasetState)); + } + } + return new CombinedWorkUnitAndDatasetState(workUnitStates, datasetStateMap); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java index 9da34d7..5ccf985 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/FsDatasetStateStore.java @@ -30,11 +30,6 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.gobblin.metastore.predicates.StateStorePredicate; -import org.apache.gobblin.metastore.predicates.StoreNamePredicate; -import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager; -import org.apache.gobblin.util.filters.HiddenFilter; -import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -58,17 +53,22 @@ import com.google.common.collect.Lists; import com.typesafe.config.Config; import com.typesafe.config.ConfigValue; -import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.metastore.predicates.StoreNamePredicate; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.metastore.FsStateStore; import org.apache.gobblin.metastore.nameParser.DatasetUrnStateStoreNameParser; import org.apache.gobblin.metastore.nameParser.SimpleDatasetUrnStateStoreNameParser; +import org.apache.gobblin.runtime.metastore.filesystem.FsDatasetStateStoreEntryManager; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.Either; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.WritableShimSerialization; import org.apache.gobblin.util.executors.IteratorExecutor; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import org.apache.gobblin.util.filters.HiddenFilter; +import org.apache.gobblin.util.hadoop.GobblinSequenceFileReader; /** @@ -187,7 +187,8 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp return getInternal(storeName, tableName, stateId, false); } - public JobState.DatasetState getInternal(String storeName, String tableName, String stateId, boolean sanitizeKeyForComparison) + public JobState.DatasetState getInternal(String storeName, String tableName, String stateId, + boolean sanitizeKeyForComparison) throws IOException { Path tablePath = new Path(new Path(this.storeRootDir, storeName), tableName); @@ -206,8 +207,9 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp Text key = new Text(); while (reader.next(key)) { - String stringKey = sanitizeKeyForComparison ? - sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString()) : key.toString(); + String stringKey = + sanitizeKeyForComparison ? sanitizeDatasetStatestoreNameFromDatasetURN(storeName, key.toString()) + : key.toString(); writable = reader.getCurrentValue(writable); if (stringKey.equals(stateId)) { if (writable instanceof JobState.DatasetState) { @@ -235,19 +237,19 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp Configuration deserializeConfig = new Configuration(this.conf); WritableShimSerialization.addToHadoopConfiguration(deserializeConfig); - try (@SuppressWarnings("deprecation") GobblinSequenceFileReader reader = new GobblinSequenceFileReader(this.fs, tablePath, - deserializeConfig)) { + try (@SuppressWarnings("deprecation") GobblinSequenceFileReader reader = new GobblinSequenceFileReader(this.fs, + tablePath, deserializeConfig)) { /** * Add this change so that all stateful flow will have back compatibility. * Shim layer of state store is therefore avoided because of this change. * Keep the implementation of Shim layer temporarily. */ - String className = reader.getValueClassName(); - if (className.startsWith("gobblin")) { - LOGGER.warn("There's old JobState with no apache package name being read while we cast them at runtime"); - className = "org.apache." + className; - } + String className = reader.getValueClassName(); + if (className.startsWith("gobblin")) { + LOGGER.warn("There's old JobState with no apache package name being read while we cast them at runtime"); + className = "org.apache." + className; + } if (!className.equals(JobState.class.getName()) && !className.equals(JobState.DatasetState.class.getName())) { throw new RuntimeException("There is a mismatch in the Class Type of state in state-store and that in runtime"); @@ -363,8 +365,8 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp String alias = Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX - : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, datasetUrn) + "-" - + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; + : sanitizeDatasetStatestoreNameFromDatasetURN(storeName, CharMatcher.is(':').replaceFrom(datasetUrn, '.')) + + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; return get(storeName, alias, datasetUrn); } @@ -424,9 +426,9 @@ public class FsDatasetStateStore extends FsStateStore<JobState.DatasetState> imp public List<FsDatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) throws IOException { - Stream<Path> stores = predicate instanceof StoreNamePredicate ? - Stream.of(new Path(this.storeRootDir, ((StoreNamePredicate) predicate).getStoreName())) : - lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath); + Stream<Path> stores = predicate instanceof StoreNamePredicate ? Stream + .of(new Path(this.storeRootDir, ((StoreNamePredicate) predicate).getStoreName())) + : lsStream(new Path(this.storeRootDir)).map(FileStatus::getPath); if (stores == null) { return Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index d8fbe4e..2ed8711 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -149,10 +149,10 @@ public class JobContext implements Closeable { State jobPropsState = new State(); jobPropsState.addAll(jobProps); - this.jobState = - new JobState(jobPropsState, this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName), this.jobName, - this.jobId); + + this.jobState = new JobState(jobPropsState, this.jobName, this.jobId); this.jobState.setBroker(this.jobBroker); + this.jobState.setWorkUnitAndDatasetStateFunctional(new CombinedWorkUnitAndDatasetStateGenerator(this.datasetStateStore, this.jobName)); stagingDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR); outputDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR); @@ -189,10 +189,8 @@ public class JobContext implements Closeable { if (!stateStoreEnabled) { stateStoreType = ConfigurationKeys.STATE_STORE_TYPE_NOOP; } else { - stateStoreType = ConfigUtils - .getString(jobConfig, ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, - ConfigUtils.getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, - ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)); + stateStoreType = ConfigUtils.getString(jobConfig, ConfigurationKeys.DATASET_STATE_STORE_TYPE_KEY, ConfigUtils + .getString(jobConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)); } ClassAliasResolver<DatasetStateStore.Factory> resolver = new ClassAliasResolver<>(DatasetStateStore.Factory.class); @@ -228,8 +226,8 @@ public class JobContext implements Closeable { Preconditions.checkState(this.jobState.contains(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_DIR)); try (FileSystem fs = FileSystem.get(URI.create(this.jobState - .getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_FS_URI, - ConfigurationKeys.LOCAL_FS_URI)), HadoopUtils.getConfFromState(this.jobState))) { + .getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_FS_URI, ConfigurationKeys.LOCAL_FS_URI)), + HadoopUtils.getConfFromState(this.jobState))) { return Optional.<CommitSequenceStore>of(new FsCommitSequenceStore(fs, new Path(this.jobState.getProp(FsCommitSequenceStore.GOBBLIN_RUNTIME_COMMIT_SEQUENCE_STORE_DIR)))); @@ -311,21 +309,24 @@ public class JobContext implements Closeable { // Add jobId to writer staging dir if (this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR)) { String writerStagingDirWithJobId = - new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), this.getJobName()), this.jobId).toString(); + new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR), this.getJobName()), + this.jobId).toString(); this.jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, writerStagingDirWithJobId); } // Add jobId to writer output dir if (this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) { String writerOutputDirWithJobId = - new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), this.getJobName()), this.jobId).toString(); + new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), this.getJobName()), this.jobId) + .toString(); this.jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, writerOutputDirWithJobId); } // Add jobId to task data root dir if (this.jobState.contains(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY)) { String taskDataRootDirWithJobId = - new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), this.getJobName()), this.jobId).toString(); + new Path(getJobDir(this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY), this.getJobName()), + this.jobId).toString(); this.jobState.setProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY, taskDataRootDirWithJobId); setTaskStagingDir(); @@ -347,8 +348,8 @@ public class JobContext implements Closeable { ConfigurationKeys.WRITER_STAGING_DIR, ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY)); } else { String workingDir = this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY); - this.jobState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, - new Path(workingDir, TASK_STAGING_DIR_NAME).toString()); + this.jobState + .setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(workingDir, TASK_STAGING_DIR_NAME).toString()); LOG.info(String.format("Writer Staging Directory is set to %s.", this.jobState.getProp(ConfigurationKeys.WRITER_STAGING_DIR))); } @@ -367,8 +368,8 @@ public class JobContext implements Closeable { } else { String workingDir = this.jobState.getProp(ConfigurationKeys.TASK_DATA_ROOT_DIR_KEY); this.jobState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(workingDir, TASK_OUTPUT_DIR_NAME).toString()); - LOG.info(String.format("Writer Output Directory is set to %s.", - this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR))); + LOG.info(String + .format("Writer Output Directory is set to %s.", this.jobState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR))); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java index 6c16de1..47356fc 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.hadoop.io.Text; import com.codahale.metrics.Counter; @@ -131,6 +132,7 @@ public class JobState extends SourceState { private final Map<String, TaskState> taskStates = Maps.newLinkedHashMap(); // Skipped task states shouldn't be exposed to publisher, but they need to be in JobState and DatasetState so that they can be written to StateStore. private final Map<String, TaskState> skippedTaskStates = Maps.newLinkedHashMap(); + private DatasetStateStore datasetStateStore; // Necessary for serialization/deserialization public JobState() { @@ -142,6 +144,13 @@ public class JobState extends SourceState { this.setId(jobId); } + public JobState(State properties,String jobName, String jobId) { + super(properties); + this.jobName = jobName; + this.jobId = jobId; + this.setId(jobId); + } + public JobState(State properties, Map<String, JobState.DatasetState> previousDatasetStates, String jobName, String jobId) { super(properties, previousDatasetStates, workUnitStatesFromDatasetStates(previousDatasetStates.values())); @@ -728,7 +737,7 @@ public class JobState extends SourceState { return datasetState; } - private static List<WorkUnitState> workUnitStatesFromDatasetStates(Iterable<JobState.DatasetState> datasetStates) { + public static List<WorkUnitState> workUnitStatesFromDatasetStates(Iterable<JobState.DatasetState> datasetStates) { ImmutableList.Builder<WorkUnitState> taskStateBuilder = ImmutableList.builder(); for (JobState datasetState : datasetStates) { taskStateBuilder.addAll(datasetState.getTaskStatesAsWorkUnitStates()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b39bf8ca/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java index 22a9718..309e4c5 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java @@ -102,7 +102,7 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat public JobState.DatasetState getLatestDatasetState(String storeName, String datasetUrn) throws IOException { String alias = Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX - : datasetUrn + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; + : CharMatcher.is(':').replaceFrom(datasetUrn, '.') + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; return get(storeName, alias, datasetUrn); }
