Repository: incubator-gobblin Updated Branches: refs/heads/master 280b1d35e -> b54e2818d
[GOBBLIN-210] Implemented two abstract sources based on dataset finder Closes #2063 from ibuenros/datasetfinder-source Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b54e2818 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b54e2818 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b54e2818 Branch: refs/heads/master Commit: b54e2818d2b0861a019dafd0ea62b83f701152ee Parents: 280b1d3 Author: ibuenros <[email protected]> Authored: Thu Aug 17 14:40:44 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Thu Aug 17 14:40:44 2017 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/dataset/Dataset.java | 12 +- .../gobblin/dataset/IterableDatasetFinder.java | 24 ++ .../gobblin/dataset/PartitionableDataset.java | 61 +++++ .../apache/gobblin/dataset/URNIdentified.java | 29 +++ .../URNLexicographicalComparator.java | 53 +++++ .../dataset/comparators/package-info.java | 4 + .../dataset/test/SimpleDatasetForTesting.java | 38 ++++ .../test/SimpleDatasetPartitionForTesting.java | 35 +++ .../SimplePartitionableDatasetForTesting.java | 56 +++++ .../test/StaticDatasetsFinderForTesting.java | 61 +++++ .../source/workunit/BasicWorkUnitStream.java | 13 +- .../management/copy/CloseableFsCopySource.java | 2 +- .../data/management/copy/CopySource.java | 14 +- .../data/management/dataset/DatasetUtils.java | 9 + .../management/source/DatasetFinderSource.java | 141 ++++++++++++ .../source/LoopingDatasetFinderSource.java | 226 +++++++++++++++++++ .../source/DatasetFinderSourceTest.java | 137 +++++++++++ .../source/LoopingDatasetFinderSourceTest.java | 218 ++++++++++++++++++ .../apache/gobblin/runtime/task/NoopTask.java | 60 +++++ .../org/apache/gobblin/util/HadoopUtils.java | 20 ++ 20 files changed, 1204 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java index fb8c1fa..abc225f 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/Dataset.java @@ -20,9 +20,17 @@ package org.apache.gobblin.dataset; /** * Interface representing a dataset. */ -public interface Dataset { +public interface Dataset extends URNIdentified { + /** - * Deepest {@link org.apache.hadoop.fs.Path} that contains all files in the dataset. + * URN for this dataset. + * @deprecated use {@link #getUrn()} */ + @Deprecated public String datasetURN(); + + @Override + default String getUrn() { + return datasetURN(); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java index 93f9586..a842c3c 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/IterableDatasetFinder.java @@ -18,7 +18,11 @@ package org.apache.gobblin.dataset; import java.io.IOException; +import java.util.Comparator; import java.util.Iterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** @@ -30,7 +34,27 @@ public interface IterableDatasetFinder<T extends Dataset> extends DatasetsFinder /** * @return An {@link Iterator} over the {@link Dataset}s found. * @throws IOException + * @deprecated use {@link #getDatasetsStream} instead. */ + @Deprecated public Iterator<T> getDatasetsIterator() throws IOException; + /** + * Get a stream of {@link Dataset}s found. + * @param desiredCharacteristics desired {@link java.util.Spliterator} characteristics of this stream. The returned + * stream need not satisfy these characteristics, this argument merely implies that the + * caller will run optimally when those characteristics are present, allowing pushdown of + * those characteristics. For example {@link java.util.Spliterator#SORTED} can sometimes + * be pushed down at a cost, so the {@link DatasetsFinder} would only push it down if it is valuable + * for the caller. + * @param suggestedOrder suggested order of the datasets in the stream. Implementation may or may not return the entries + * in that order. If the entries are in that order, implementation should ensure the spliterator + * is annotated as such. + * @return a stream of {@link Dataset}s found. + * @throws IOException + */ + default Stream<T> getDatasetsStream(int desiredCharacteristics, Comparator<T> suggestedOrder) throws IOException { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(getDatasetsIterator(), 0), false); + } + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java new file mode 100644 index 0000000..06e1ec0 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/PartitionableDataset.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +import java.io.IOException; +import java.util.Comparator; +import java.util.stream.Stream; + + +/** + * A {@link Dataset} that can be partitioned into disjoint subsets of the dataset. + * @param <T> the type of partitions returned by the dataset. + */ +public interface PartitionableDataset<T extends PartitionableDataset.DatasetPartition> extends Dataset { + + /** + * Get a stream of partitions. + * @param desiredCharacteristics desired {@link java.util.Spliterator} characteristics of this stream. The returned + * stream need not satisfy these characteristics, this argument merely implies that the + * caller will run optimally when those characteristics are present, allowing pushdown of + * those characteristics. For example {@link java.util.Spliterator#SORTED} can sometimes + * be pushed down at a cost, so the {@link Dataset} would only push it down if it is valuable + * for the caller. + * @param suggestedOrder suggested order of the partitions in the stream. Implementation may or may not return the entries + * in that order. If the entries are in that order, implementation should ensure the spliterator + * is annotated as such. + * @return a {@link Stream} over {@link DatasetPartition}s in this dataset. + */ + Stream<T> getPartitions(int desiredCharacteristics, Comparator<T> suggestedOrder) throws IOException; + + /** + * A partition of a {@link PartitionableDataset}. + */ + interface DatasetPartition extends URNIdentified { + /** + * URN for this dataset. + */ + String getUrn(); + + /** + * @return Dataset this partition belongs to. + */ + Dataset getDataset(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java new file mode 100644 index 0000000..b6d5137 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/URNIdentified.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset; + +/** + * An object that can be identified by URN. + * Note the contract is that given o1, o2, then o1.equals(o2) iff o1.class.equals(o2.class) and o1.getUrn().equals(o2.getUrn()) + */ +public interface URNIdentified { + /** + * URN for this object. + */ + public String getUrn(); +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java new file mode 100644 index 0000000..bef0461 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/URNLexicographicalComparator.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset.comparators; + +import java.io.Serializable; +import java.util.Comparator; + +import org.apache.gobblin.dataset.URNIdentified; + +import lombok.EqualsAndHashCode; + + +/** + * Dataset comparator that compares by dataset urn. + */ +@EqualsAndHashCode +public class URNLexicographicalComparator implements Comparator<URNIdentified>, Serializable { + private static final long serialVersionUID = 2647543651352156568L; + + @Override + public int compare(URNIdentified o1, URNIdentified o2) { + return o1.getUrn().compareTo(o2.getUrn()); + } + + /** + * Compare against a raw URN. + */ + public int compare(URNIdentified o1, String urn2) { + return o1.getUrn().compareTo(urn2); + } + + /** + * Compare against a raw URN. + */ + public int compare(String urn1, URNIdentified o2) { + return urn1.compareTo(o2.getUrn()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java new file mode 100644 index 0000000..fc8fe18 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/comparators/package-info.java @@ -0,0 +1,4 @@ +/** + * Contains common dataset orders that {@link org.apache.gobblin.dataset.DatasetsFinder}s can push down. + */ +package org.apache.gobblin.dataset.comparators; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java new file mode 100644 index 0000000..1136ef4 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetForTesting.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset.test; + +import org.apache.gobblin.dataset.Dataset; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; + + +/** + * A dumb {@link Dataset} used for testing. + */ +@AllArgsConstructor +@EqualsAndHashCode +public class SimpleDatasetForTesting implements Dataset { + private final String urn; + + @Override + public String datasetURN() { + return this.urn; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java new file mode 100644 index 0000000..92624d9 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimpleDatasetPartitionForTesting.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset.test; + +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.PartitionableDataset; + +import lombok.Data; +import lombok.EqualsAndHashCode; + + +/** + * A dumb {@link org.apache.gobblin.dataset.PartitionableDataset.DatasetPartition} used for testing. + */ +@Data +@EqualsAndHashCode +public class SimpleDatasetPartitionForTesting implements PartitionableDataset.DatasetPartition { + private final String urn; + private Dataset dataset; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java new file mode 100644 index 0000000..849e080 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/SimplePartitionableDatasetForTesting.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset.test; + +import java.io.IOException; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.gobblin.dataset.PartitionableDataset; + +import lombok.EqualsAndHashCode; + + +/** + * A {@link PartitionableDataset} that just returns a predefined set of {@link SimpleDatasetPartitionForTesting} used for testing. + */ +@EqualsAndHashCode +public class SimplePartitionableDatasetForTesting implements PartitionableDataset<SimpleDatasetPartitionForTesting> { + private final String urn; + private final List<SimpleDatasetPartitionForTesting> partitions; + + public SimplePartitionableDatasetForTesting(String urn, List<SimpleDatasetPartitionForTesting> partitions) { + this.urn = urn; + this.partitions = partitions; + for (SimpleDatasetPartitionForTesting partition : this.partitions) { + partition.setDataset(this); + } + } + + @Override + public String datasetURN() { + return this.urn; + } + + @Override + public Stream<SimpleDatasetPartitionForTesting> getPartitions(int desiredCharacteristics, + Comparator<SimpleDatasetPartitionForTesting> suggestedOrder) throws IOException { + return this.partitions.stream(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java new file mode 100644 index 0000000..71f6add --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/test/StaticDatasetsFinderForTesting.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.dataset.test; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.hadoop.fs.Path; + +import lombok.AllArgsConstructor; + + +/** + * A {@link org.apache.gobblin.dataset.DatasetsFinder} that returns a predefined set of {@link Dataset}s for testing. + */ +@AllArgsConstructor +public class StaticDatasetsFinderForTesting implements IterableDatasetFinder<Dataset> { + + private final List<Dataset> datasets; + + @Override + public List<Dataset> findDatasets() throws IOException { + return this.datasets; + } + + @Override + public Path commonDatasetRoot() { + return null; + } + + @Override + public Iterator<Dataset> getDatasetsIterator() throws IOException { + return this.datasets.iterator(); + } + + @Override + public Stream<Dataset> getDatasetsStream(int desiredCharacteristics, Comparator<Dataset> suggestedOrder) + throws IOException { + return this.datasets.stream(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java index 0d07312..86c813e 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/BasicWorkUnitStream.java @@ -28,7 +28,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import lombok.Getter; -import lombok.Setter; /** @@ -120,9 +119,7 @@ public class BasicWorkUnitStream implements WorkUnitStream { public static class Builder { private Iterator<WorkUnit> workUnits; private List<WorkUnit> workUnitList; - @Setter private boolean finiteStream = true; - @Setter private boolean safeToMaterialize = false; @@ -136,6 +133,16 @@ public class BasicWorkUnitStream implements WorkUnitStream { this.finiteStream = true; } + public Builder setFiniteStream(boolean finiteStream) { + this.finiteStream = finiteStream; + return this; + } + + public Builder setSafeToMaterialize(boolean safeToMaterialize) { + this.safeToMaterialize = safeToMaterialize; + return this; + } + public WorkUnitStream build() { return new BasicWorkUnitStream(this.workUnits, this.workUnitList, this.finiteStream, this.safeToMaterialize); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java index f4cf4fa..d71d590 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CloseableFsCopySource.java @@ -59,7 +59,7 @@ public class CloseableFsCopySource extends CopySource { @Override protected FileSystem getSourceFileSystem(State state) throws IOException { - return this.closer.register(super.getSourceFileSystem(state)); + return this.closer.register(HadoopUtils.getSourceFileSystem(state)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index 5dd7f85..f60e5f0 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -145,8 +145,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { DeprecationUtils.renameDeprecatedKeys(state, CopyConfiguration.MAX_COPY_PREFIX + "." + CopyResourcePool.ENTITIES_KEY, Lists.newArrayList(MAX_FILES_COPIED_KEY)); - final FileSystem sourceFs = getSourceFileSystem(state); - final FileSystem targetFs = getTargetFileSystem(state); + final FileSystem sourceFs = HadoopUtils.getSourceFileSystem(state); + final FileSystem targetFs = HadoopUtils.getWriterFileSystem(state, 1, 0); state.setProp(SlaEventKeys.SOURCE_URI, sourceFs.getUri()); state.setProp(SlaEventKeys.DESTINATION_URI, targetFs.getUri()); @@ -325,7 +325,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { if (CopyableFile.class.isAssignableFrom(copyEntityClass)) { CopyableFile copyEntity = (CopyableFile) deserializeCopyEntity(state); - return extractorForCopyableFile(getSourceFileSystem(state), copyEntity, state); + return extractorForCopyableFile(HadoopUtils.getSourceFileSystem(state), copyEntity, state); } return new EmptyExtractor<>("empty"); } @@ -339,6 +339,10 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { public void shutdown(SourceState state) { } + /** + * @deprecated use {@link HadoopUtils#getSourceFileSystem(State)}. + */ + @Deprecated protected FileSystem getSourceFileSystem(State state) throws IOException { Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH)); @@ -346,6 +350,10 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state); } + /** + * @deprecated use {@link HadoopUtils#getWriterFileSystem(State, int, int)}. + */ + @Deprecated private static FileSystem getTargetFileSystem(State state) throws IOException { return HadoopUtils.getOptionallyThrottledFileSystem(WriterUtils.getWriterFS(state, 1, 0), state); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java index 2c1e954..97dd2d9 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetUtils.java @@ -28,6 +28,8 @@ import org.apache.hadoop.fs.PathFilter; import com.google.common.collect.Lists; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.gobblin.dataset.IterableDatasetFinderImpl; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.CopyableFileFilter; import org.apache.gobblin.dataset.DatasetsFinder; @@ -90,6 +92,13 @@ public class DatasetUtils { } } + public static <T extends org.apache.gobblin.dataset.Dataset> IterableDatasetFinder<T> instantiateIterableDatasetFinder( + Properties props, FileSystem fs, String default_class, Object... additionalArgs) throws IOException { + DatasetsFinder<T> datasetsFinder = instantiateDatasetFinder(props, fs, default_class, additionalArgs); + return datasetsFinder instanceof IterableDatasetFinder ? (IterableDatasetFinder<T>) datasetsFinder + : new IterableDatasetFinderImpl<>(datasetsFinder); + } + /** * Instantiate a {@link PathFilter} from the class name at key {@link #PATH_FILTER_KEY} in props passed. If key * {@link #PATH_FILTER_KEY} is not set, a default {@link #ACCEPT_ALL_PATH_FILTER} is returned http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java new file mode 100644 index 0000000..38fc7e2 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/DatasetFinderSource.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.source; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.data.management.dataset.DatasetUtils; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.gobblin.dataset.PartitionableDataset; +import org.apache.gobblin.source.WorkUnitStreamSource; +import org.apache.gobblin.source.workunit.BasicWorkUnitStream; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitStream; +import org.apache.gobblin.util.HadoopUtils; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * An abstract source that uses a {@link org.apache.gobblin.dataset.DatasetsFinder} to find {@link Dataset}s and creates a + * work unit for each one. + */ +@Slf4j +public abstract class DatasetFinderSource<S, D> implements WorkUnitStreamSource<S, D> { + + protected final boolean drilldownIntoPartitions; + + /** + * @param drilldownIntoPartitions if set to true, will process each partition of a {@link PartitionableDataset} as a + * separate work unit. + */ + public DatasetFinderSource(boolean drilldownIntoPartitions) { + this.drilldownIntoPartitions = drilldownIntoPartitions; + } + + /** + * @return the {@link WorkUnit} for the input dataset. + */ + protected abstract WorkUnit workUnitForDataset(Dataset dataset); + + /** + * @return the {@link WorkUnit} for the input partition. + */ + protected abstract WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition); + + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + try { + return createWorkUnitStream(state).collect(Collectors.toList()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public WorkUnitStream getWorkunitStream(SourceState state) { + try { + return new BasicWorkUnitStream.Builder(createWorkUnitStream(state).iterator()).build(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + /** + * Can be overriden to specify a non-pluggable {@link org.apache.gobblin.dataset.DatasetsFinder}. + * @throws IOException + */ + protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException { + return DatasetUtils.instantiateIterableDatasetFinder(state.getProperties(), + HadoopUtils.getSourceFileSystem(state), null); + } + + private Stream<WorkUnit> createWorkUnitStream(SourceState state) throws IOException { + IterableDatasetFinder datasetsFinder = createDatasetsFinder(state); + + Stream<Dataset> datasetStream = datasetsFinder.getDatasetsStream(0, null); + + if (this.drilldownIntoPartitions) { + return datasetStream.flatMap(dataset -> { + if (dataset instanceof PartitionableDataset) { + try { + return (Stream<PartitionableDataset.DatasetPartition>) ((PartitionableDataset) dataset).getPartitions(0, + null); + } catch (IOException ioe) { + log.error("Failed to get partitions for dataset " + dataset.getUrn()); + return Stream.empty(); + } + } else { + return Stream.of(new DatasetWrapper(dataset)); + } + }).map(this::workUnitForPartitionInternal); + } else { + return datasetStream.map(this::workUnitForDataset); + } + } + + private WorkUnit workUnitForPartitionInternal(PartitionableDataset.DatasetPartition partition) { + if (partition instanceof DatasetWrapper) { + return workUnitForDataset(((DatasetWrapper) partition).dataset); + } else { + return workUnitForDatasetPartition(partition); + } + } + + /** + * A wrapper around a {@link org.apache.gobblin.dataset.PartitionableDataset.DatasetPartition} that makes it look + * like a {@link Dataset} for slightly easier to understand code. + */ + @AllArgsConstructor + protected static class DatasetWrapper implements PartitionableDataset.DatasetPartition { + @Getter + private final Dataset dataset; + + @Override + public String getUrn() { + return this.dataset.datasetURN(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java new file mode 100644 index 0000000..4ca0dcb --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSource.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.source; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Spliterator; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.gobblin.dataset.PartitionableDataset; +import org.apache.gobblin.dataset.URNIdentified; +import org.apache.gobblin.dataset.comparators.URNLexicographicalComparator; +import org.apache.gobblin.runtime.task.NoopTask; +import org.apache.gobblin.source.workunit.BasicWorkUnitStream; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitStream; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.collect.PeekingIterator; + +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; + + +/** + * A source that processes datasets generated by a {@link org.apache.gobblin.dataset.DatasetsFinder}, processing a few of + * them each run, and continuing from where it left off in the next run. When it is done processing all the datasets, it + * starts over from the beginning. The datasets are processed in lexicographical order based on URN. + * + * TODO: handle retries + */ +@Slf4j +public abstract class LoopingDatasetFinderSource<S, D> extends DatasetFinderSource<S, D> { + + public static final String MAX_WORK_UNITS_PER_RUN_KEY = "gobblin.source.loopingDatasetFinderSource.maxWorkUnitsPerRun"; + public static final int MAX_WORK_UNITS_PER_RUN = 10; + + private static final String DATASET_URN = "gobblin.source.loopingDatasetFinderSource.datasetUrn"; + private static final String PARTITION_URN = "gobblin.source.loopingDatasetFinderSource.partitionUrn"; + private static final String WORK_UNIT_ORDINAL = "gobblin.source.loopingDatasetFinderSource.workUnitOrdinal"; + protected static final String END_OF_DATASETS_KEY = "gobblin.source.loopingDatasetFinderSource.endOfDatasets"; + + private final URNLexicographicalComparator lexicographicalComparator = new URNLexicographicalComparator(); + + /** + * @param drilldownIntoPartitions if set to true, will process each partition of a {@link PartitionableDataset} as a + * separate work unit. + */ + public LoopingDatasetFinderSource(boolean drilldownIntoPartitions) { + super(drilldownIntoPartitions); + } + + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + return Lists.newArrayList(getWorkunitStream(state).getMaterializedWorkUnitCollection()); + } + + @Override + public WorkUnitStream getWorkunitStream(SourceState state) { + try { + int maxWorkUnits = state.getPropAsInt(MAX_WORK_UNITS_PER_RUN_KEY, MAX_WORK_UNITS_PER_RUN); + + List<WorkUnitState> previousWorkUnitStates = state.getPreviousWorkUnitStates(); + Optional<WorkUnitState> maxWorkUnit; + try { + maxWorkUnit = previousWorkUnitStates.stream().reduce((wu1, wu2) -> { + int wu1Ordinal = wu1.getPropAsInt(WORK_UNIT_ORDINAL); + int wu2Ordinal = wu2.getPropAsInt(WORK_UNIT_ORDINAL); + return wu1Ordinal > wu2Ordinal ? wu1 : wu2; + }); + } catch (NumberFormatException nfe) { + throw new RuntimeException("Work units in state store are corrupted! Missing or malformed " + WORK_UNIT_ORDINAL); + } + + String previousDatasetUrnWatermark = null; + String previousPartitionUrnWatermark = null; + if (maxWorkUnit.isPresent() && !maxWorkUnit.get().getPropAsBoolean(END_OF_DATASETS_KEY, false)) { + previousDatasetUrnWatermark = maxWorkUnit.get().getProp(DATASET_URN); + previousPartitionUrnWatermark = maxWorkUnit.get().getProp(PARTITION_URN); + } + + IterableDatasetFinder datasetsFinder = createDatasetsFinder(state); + + Stream<Dataset> datasetStream = datasetsFinder.getDatasetsStream(Spliterator.SORTED, this.lexicographicalComparator); + datasetStream = sortStreamLexicographically(datasetStream); + + return new BasicWorkUnitStream.Builder(new DeepIterator(datasetStream.iterator(), previousDatasetUrnWatermark, + previousPartitionUrnWatermark, maxWorkUnits)).setFiniteStream(true).build(); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + /** + * A deep iterator that advances input streams until the correct position, then possibly iterates over partitions + * of {@link PartitionableDataset}s. + */ + private class DeepIterator extends AbstractIterator<WorkUnit> { + private final Iterator<Dataset> baseIterator; + private final int maxWorkUnits; + + private Iterator<PartitionableDataset.DatasetPartition> currentPartitionIterator; + private int generatedWorkUnits = 0; + + public DeepIterator(Iterator<Dataset> baseIterator, String previousDatasetUrnWatermark, + String previousPartitionUrnWatermark, int maxWorkUnits) { + this.maxWorkUnits = maxWorkUnits; + this.baseIterator = baseIterator; + + Dataset equalDataset = advanceUntilLargerThan(Iterators.peekingIterator(this.baseIterator), previousDatasetUrnWatermark); + + if (drilldownIntoPartitions && equalDataset != null && equalDataset instanceof PartitionableDataset) { + this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) equalDataset); + advanceUntilLargerThan(Iterators.peekingIterator(this.currentPartitionIterator), previousPartitionUrnWatermark); + } else { + this.currentPartitionIterator = Iterators.emptyIterator(); + } + } + + /** + * Advance an iterator until the next value is larger than the reference. + * @return the last value polled if it is equal to reference, or null otherwise. + */ + @Nullable private <T extends URNIdentified> T advanceUntilLargerThan(PeekingIterator<T> it, String reference) { + if (reference == null) { + return null; + } + + int comparisonResult = -1; + while (it.hasNext() && (comparisonResult = lexicographicalComparator.compare(it.peek(), reference)) < 0) { + it.next(); + } + return comparisonResult == 0 ? it.next() : null; + } + + private Iterator<PartitionableDataset.DatasetPartition> getPartitionIterator(PartitionableDataset dataset) { + try { + return this.currentPartitionIterator = sortStreamLexicographically( + dataset.getPartitions(Spliterator.SORTED, LoopingDatasetFinderSource.this.lexicographicalComparator)).iterator(); + } catch (IOException ioe) { + log.error("Failed to get partitions for dataset " + dataset.getUrn()); + return Iterators.emptyIterator(); + } + } + + @Override + protected WorkUnit computeNext() { + if (this.generatedWorkUnits >= this.maxWorkUnits) { + return endOfData(); + } + + while (this.baseIterator.hasNext() || this.currentPartitionIterator.hasNext()) { + if (this.currentPartitionIterator != null && this.currentPartitionIterator.hasNext()) { + PartitionableDataset.DatasetPartition partition = this.currentPartitionIterator.next(); + WorkUnit workUnit = workUnitForDatasetPartition(partition); + addDatasetInfoToWorkUnit(workUnit, partition.getDataset(), this.generatedWorkUnits++); + addPartitionInfoToWorkUnit(workUnit, partition); + return workUnit; + } + + Dataset dataset = this.baseIterator.next(); + if (drilldownIntoPartitions && dataset instanceof PartitionableDataset) { + this.currentPartitionIterator = getPartitionIterator((PartitionableDataset) dataset); + } else { + WorkUnit workUnit = workUnitForDataset(dataset); + addDatasetInfoToWorkUnit(workUnit, dataset, this.generatedWorkUnits++); + return workUnit; + } + } + + WorkUnit workUnit = NoopTask.noopWorkunit(); + workUnit.setProp(WORK_UNIT_ORDINAL, this.generatedWorkUnits); + + this.generatedWorkUnits = Integer.MAX_VALUE; + + workUnit.setProp(END_OF_DATASETS_KEY, true); + return workUnit; + } + + private void addDatasetInfoToWorkUnit(WorkUnit workUnit, Dataset dataset, int workUnitOrdinal) { + workUnit.setProp(DATASET_URN, dataset.getUrn()); + workUnit.setProp(WORK_UNIT_ORDINAL, workUnitOrdinal); + } + + private void addPartitionInfoToWorkUnit(WorkUnit workUnit, PartitionableDataset.DatasetPartition partition) { + workUnit.setProp(PARTITION_URN, partition.getUrn()); + } + } + + /** + * Sort input stream lexicographically. Noop if the input stream is already sorted. + */ + private <T extends URNIdentified> Stream<T> sortStreamLexicographically(Stream<T> inputStream) { + Spliterator<T> spliterator = inputStream.spliterator(); + if (spliterator.hasCharacteristics(Spliterator.SORTED) && + spliterator.getComparator().equals(this.lexicographicalComparator)) { + return StreamSupport.stream(spliterator, false); + } + return StreamSupport.stream(spliterator, false).sorted(this.lexicographicalComparator); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java new file mode 100644 index 0000000..0e34b7a --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/DatasetFinderSourceTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.source; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.gobblin.dataset.PartitionableDataset; +import org.apache.gobblin.dataset.test.SimpleDatasetForTesting; +import org.apache.gobblin.dataset.test.SimpleDatasetPartitionForTesting; +import org.apache.gobblin.dataset.test.SimplePartitionableDatasetForTesting; +import org.apache.gobblin.dataset.test.StaticDatasetsFinderForTesting; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitStream; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + + +public class DatasetFinderSourceTest { + + public static final String DATASET_URN = "test.datasetUrn"; + public static final String PARTITION_URN = "test.partitionUrn"; + + @Test + public void testNonDrilledDown() { + + Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"))); + Dataset dataset3 = new SimpleDatasetForTesting("dataset3"); + + IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset1, dataset2, dataset3)); + + MySource mySource = new MySource(false, finder); + List<WorkUnit> workUnits = mySource.getWorkunits(new SourceState()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DATASET_URN), "dataset1"); + Assert.assertNull(workUnits.get(0).getProp(PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DATASET_URN), "dataset2"); + Assert.assertNull(workUnits.get(1).getProp(PARTITION_URN)); + Assert.assertEquals(workUnits.get(2).getProp(DATASET_URN), "dataset3"); + Assert.assertNull(workUnits.get(2).getProp(PARTITION_URN)); + + WorkUnitStream workUnitStream = mySource.getWorkunitStream(new SourceState()); + + Assert.assertEquals(Lists.newArrayList(workUnitStream.getWorkUnits()), workUnits); + } + + @Test + public void testDrilledDown() { + Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"))); + Dataset dataset3 = new SimpleDatasetForTesting("dataset3"); + + IterableDatasetFinder finder = new StaticDatasetsFinderForTesting(Lists.newArrayList(dataset1, dataset2, dataset3)); + + MySource mySource = new MySource(true, finder); + List<WorkUnit> workUnits = mySource.getWorkunits(new SourceState()); + + Assert.assertEquals(workUnits.size(), 4); + Assert.assertEquals(workUnits.get(0).getProp(DATASET_URN), "dataset1"); + Assert.assertNull(workUnits.get(0).getProp(PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(1).getProp(PARTITION_URN), "p1"); + Assert.assertEquals(workUnits.get(2).getProp(DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(2).getProp(PARTITION_URN), "p2"); + Assert.assertEquals(workUnits.get(3).getProp(DATASET_URN), "dataset3"); + Assert.assertNull(workUnits.get(3).getProp(PARTITION_URN)); + + WorkUnitStream workUnitStream = mySource.getWorkunitStream(new SourceState()); + + Assert.assertEquals(Lists.newArrayList(workUnitStream.getWorkUnits()), workUnits); + } + + public static class MySource extends DatasetFinderSource<String, String> { + private final IterableDatasetFinder datasetsFinder; + + public MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder) { + super(drilldownIntoPartitions); + this.datasetsFinder = datasetsFinder; + } + + @Override + public Extractor<String, String> getExtractor(WorkUnitState state) throws IOException { + return null; + } + + @Override + protected WorkUnit workUnitForDataset(Dataset dataset) { + WorkUnit workUnit = new WorkUnit(); + workUnit.setProp(DATASET_URN, dataset.getUrn()); + return workUnit; + } + + @Override + protected WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition) { + WorkUnit workUnit = new WorkUnit(); + workUnit.setProp(DATASET_URN, partition.getDataset().getUrn()); + workUnit.setProp(PARTITION_URN, partition.getUrn()); + return workUnit; + } + + @Override + public void shutdown(SourceState state) { + + } + + @Override + protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException { + return this.datasetsFinder; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java new file mode 100644 index 0000000..76fe172 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.source; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.dataset.Dataset; +import org.apache.gobblin.dataset.IterableDatasetFinder; +import org.apache.gobblin.dataset.PartitionableDataset; +import org.apache.gobblin.dataset.test.SimpleDatasetForTesting; +import org.apache.gobblin.dataset.test.SimpleDatasetPartitionForTesting; +import org.apache.gobblin.dataset.test.SimplePartitionableDatasetForTesting; +import org.apache.gobblin.dataset.test.StaticDatasetsFinderForTesting; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitStream; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + +public class LoopingDatasetFinderSourceTest { + + @Test + public void testNonDrilldown() { + Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), new SimpleDatasetPartitionForTesting("p2"))); + Dataset dataset3 = new SimpleDatasetForTesting("dataset3"); + Dataset dataset4 = new SimpleDatasetForTesting("dataset4"); + Dataset dataset5 = new SimpleDatasetForTesting("dataset5"); + + IterableDatasetFinder finder = new StaticDatasetsFinderForTesting( + Lists.newArrayList(dataset5, dataset4, dataset3, dataset2, dataset1)); + + MySource mySource = new MySource(false, finder); + + SourceState sourceState = new SourceState(); + sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 3); + + WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState); + List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); + Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); + Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN)); + + // Second run should continue where it left off + List<WorkUnitState> workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); + SourceState sourceStateSpy = Mockito.spy(sourceState); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset4"); + Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset5"); + Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertTrue(workUnits.get(2).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + + // Loop around + workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); + Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertNull(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); + Assert.assertNull(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN)); + } + + @Test + public void testDrilldown() { + // Create three datasets, two of them partitioned + Dataset dataset1 = new SimpleDatasetForTesting("dataset1"); + Dataset dataset2 = new SimplePartitionableDatasetForTesting("dataset2", + Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), + new SimpleDatasetPartitionForTesting("p2"), new SimpleDatasetPartitionForTesting("p3"))); + Dataset dataset3 = new SimplePartitionableDatasetForTesting("dataset3", + Lists.newArrayList(new SimpleDatasetPartitionForTesting("p1"), + new SimpleDatasetPartitionForTesting("p2"), new SimpleDatasetPartitionForTesting("p3"))); + + IterableDatasetFinder finder = new StaticDatasetsFinderForTesting( + Lists.newArrayList(dataset3, dataset2, dataset1)); + + MySource mySource = new MySource(true, finder); + + // Limit to 3 wunits per run + SourceState sourceState = new SourceState(); + sourceState.setProp(LoopingDatasetFinderSource.MAX_WORK_UNITS_PER_RUN_KEY, 3); + + // first run, get three first work units + WorkUnitStream workUnitStream = mySource.getWorkunitStream(sourceState); + List<WorkUnit> workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); + Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1"); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2"); + + // Second run should continue where it left off + List<WorkUnitState> workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); + SourceState sourceStateSpy = Mockito.spy(sourceState); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN), "p3"); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1"); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2"); + + // third run, continue from where it left off + workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 2); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset3"); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN), "p3"); + Assert.assertTrue(workUnits.get(1).getPropAsBoolean(LoopingDatasetFinderSource.END_OF_DATASETS_KEY)); + + // fourth run, finished all work units, loop around + workUnitStates = workUnits.stream().map(WorkUnitState::new).collect(Collectors.toList()); + Mockito.doReturn(workUnitStates).when(sourceStateSpy).getPreviousWorkUnitStates(); + + workUnitStream = mySource.getWorkunitStream(sourceStateSpy); + workUnits = Lists.newArrayList(workUnitStream.getWorkUnits()); + + Assert.assertEquals(workUnits.size(), 3); + Assert.assertEquals(workUnits.get(0).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset1"); + Assert.assertNull(workUnits.get(0).getProp(DatasetFinderSourceTest.PARTITION_URN)); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(1).getProp(DatasetFinderSourceTest.PARTITION_URN), "p1"); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.DATASET_URN), "dataset2"); + Assert.assertEquals(workUnits.get(2).getProp(DatasetFinderSourceTest.PARTITION_URN), "p2"); + } + + public static class MySource extends LoopingDatasetFinderSource<String, String> { + private final IterableDatasetFinder datasetsFinder; + + public MySource(boolean drilldownIntoPartitions, IterableDatasetFinder datasetsFinder) { + super(drilldownIntoPartitions); + this.datasetsFinder = datasetsFinder; + } + + @Override + public Extractor<String, String> getExtractor(WorkUnitState state) throws IOException { + return null; + } + + @Override + protected WorkUnit workUnitForDataset(Dataset dataset) { + WorkUnit workUnit = new WorkUnit(); + workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, dataset.getUrn()); + return workUnit; + } + + @Override + protected WorkUnit workUnitForDatasetPartition(PartitionableDataset.DatasetPartition partition) { + WorkUnit workUnit = new WorkUnit(); + workUnit.setProp(DatasetFinderSourceTest.DATASET_URN, partition.getDataset().getUrn()); + workUnit.setProp(DatasetFinderSourceTest.PARTITION_URN, partition.getUrn()); + return workUnit; + } + + @Override + public void shutdown(SourceState state) { + + } + + @Override + protected IterableDatasetFinder createDatasetsFinder(SourceState state) throws IOException { + return this.datasetsFinder; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java new file mode 100644 index 0000000..8bbb6e2 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/task/NoopTask.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.task; + +import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.NoopPublisher; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskContext; +import org.apache.gobblin.source.workunit.WorkUnit; + + +/** + * A task that does nothing. Usually used for transferring state from one job to the next. + */ +public class NoopTask extends BaseAbstractTask { + + /** + * @return A {@link WorkUnit} that will run a {@link NoopTask}. + */ + public static WorkUnit noopWorkunit() { + WorkUnit workUnit = new WorkUnit(); + TaskUtils.setTaskFactoryClass(workUnit, Factory.class); + return workUnit; + } + + /** + * The factory for a {@link NoopTask}. + */ + public static class Factory implements TaskFactory { + @Override + public TaskIFace createTask(TaskContext taskContext) { + return new NoopTask(taskContext); + } + + @Override + public DataPublisher createDataPublisher(JobState.DatasetState datasetState) { + return new NoopPublisher(datasetState); + } + } + + private NoopTask(TaskContext taskContext) { + super(taskContext); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b54e2818/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index b1bd6cc..8d186a6 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -26,6 +26,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URI; import java.util.Collection; import java.util.List; import java.util.Map.Entry; @@ -920,4 +921,23 @@ public class HadoopUtils { public static void addGobblinSite() { Configuration.addDefaultResource("gobblin-site.xml"); } + + /** + * Get a {@link FileSystem} object for the uri specified at {@link ConfigurationKeys#SOURCE_FILEBASED_FS_URI}. + * @throws IOException + */ + public static FileSystem getSourceFileSystem(State state) throws IOException { + Configuration conf = HadoopUtils.getConfFromState(state, Optional.of(ConfigurationKeys.SOURCE_FILEBASED_ENCRYPTED_CONFIG_PATH)); + String uri = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI); + return HadoopUtils.getOptionallyThrottledFileSystem(FileSystem.get(URI.create(uri), conf), state); + } + + /** + * Get a {@link FileSystem} object for the uri specified at {@link ConfigurationKeys#WRITER_FILE_SYSTEM_URI}. + * @throws IOException + */ + public static FileSystem getWriterFileSystem(State state, int numBranches, int branchId) + throws IOException { + return HadoopUtils.getOptionallyThrottledFileSystem(WriterUtils.getWriterFS(state, numBranches, branchId), state); + } }
