This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 712770aa8 [GOBBLIN-1741] Create manifest based dataset finder (#3598)
712770aa8 is described below
commit 712770aa8bfab7075b062f30058c860d39ab4a14
Author: Zihan Li <[email protected]>
AuthorDate: Wed Nov 30 10:30:39 2022 -0800
[GOBBLIN-1741] Create manifest based dataset finder (#3598)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1741] Create manifest based dataset finder
* imporve logging
* add unit test
* Add unit test for manifest dataset to test jason parser
* address comments
* update log info
* remove unused property and change comments
Co-authored-by: Zihan Li <[email protected]>
---
.../gobblin/data/management/copy/CopyableFile.java | 2 +-
.../data/management/copy/ManifestBasedDataset.java | 138 +++++++++++++++++++++
.../copy/ManifestBasedDatasetFinder.java | 61 +++++++++
.../dataset/ManifestBasedDatasetFinderTest.java | 94 ++++++++++++++
.../manifestBasedDistcpTest/sampleManifest.json | 14 +++
.../java/org/apache/gobblin/util/PathUtils.java | 2 +
6 files changed, 310 insertions(+), 1 deletion(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index fd04fa333..6c7b77288 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -352,7 +352,7 @@ public class CopyableFile extends CopyEntity implements
File {
List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
Path currentPath = fromPath;
- while (PathUtils.isAncestor(toPath, currentPath.getParent())) {
+ while (currentPath.getParent() != null && PathUtils.isAncestor(toPath,
currentPath.getParent())) {
ownerAndPermissions.add(resolveReplicatedOwnerAndPermission(sourceFs,
currentPath, copyConfiguration));
currentPath = currentPath.getParent();
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
new file mode 100644
index 000000000..9b7a56fc9
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.stream.JsonReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A dataset that based on Manifest. We expect the Manifest contains the list
of all the files for this dataset.
+ * At first phase, we only support copy across different clusters to the same
location. (We can add more feature to support rename in the future)
+ * We will delete the file on target if it's listed in the manifest and not
exist on source when {@link
ManifestBasedDataset.DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
+ */
+@Slf4j
+public class ManifestBasedDataset implements IterableCopyableDataset {
+
+ private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE =
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
+ private final FileSystem fs;
+ private final Path manifestPath;
+ private final Properties properties;
+ private final boolean deleteFileThatNotExistOnSource;
+ private Gson GSON = new Gson();
+
+ public ManifestBasedDataset(final FileSystem fs, Path manifestPath,
Properties properties) {
+ this.fs = fs;
+ this.manifestPath = manifestPath;
+ this.properties = properties;
+ this.deleteFileThatNotExistOnSource =
Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE,
"false"));
+ }
+
+ @Override
+ public String datasetURN() {
+ return this.manifestPath.toString();
+ }
+
+ @Override
+ public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration)
+ throws IOException {
+ if (!fs.exists(manifestPath)) {
+ throw new IOException(String.format("Manifest path %s does not exist on
filesystem %s, skipping this manifest"
+ + ", probably due to wrong configuration of %s",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
+ } else if (fs.getFileStatus(manifestPath).isDirectory()) {
+ throw new IOException(String.format("Manifest path %s on filesystem %s
is a directory, which is not supported. Please set the manifest file locations
in"
+ + "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
+ }
+ JsonReader reader = null;
+ List<CopyEntity> copyEntities = Lists.newArrayList();
+ List<FileStatus> toDelete = Lists.newArrayList();
+ //todo: put permission preserve logic here?
+ try {
+ reader = new JsonReader(new InputStreamReader(fs.open(manifestPath),
"UTF-8"));
+ reader.beginArray();
+ while (reader.hasNext()) {
+ //todo: We can use fileSet to partition the data in case of some
softbound issue
+ //todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
+ JsonObject file = GSON.fromJson(reader, JsonObject.class);
+ Path fileToCopy = new Path(file.get("fileName").getAsString());
+ if (this.fs.exists(fileToCopy)) {
+ if (!targetFs.exists(fileToCopy) ||
shouldCopy(this.fs.getFileStatus(fileToCopy),
targetFs.getFileStatus(fileToCopy))) {
+ CopyableFile copyableFile =
+ CopyableFile.fromOriginAndDestination(this.fs,
this.fs.getFileStatus(fileToCopy), fileToCopy, configuration)
+ .fileSet(datasetURN())
+ .datasetOutputPath(fileToCopy.toString())
+ .ancestorsOwnerAndPermission(CopyableFile
+
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
fileToCopy.getParent(),
+ new Path("/"), configuration))
+ .build();
+ copyableFile.setFsDatasets(this.fs, targetFs);
+ copyEntities.add(copyableFile);
+ }
+ } else if (this.deleteFileThatNotExistOnSource &&
targetFs.exists(fileToCopy)){
+ toDelete.add(targetFs.getFileStatus(fileToCopy));
+ }
+ }
+ if (this.deleteFileThatNotExistOnSource) {
+ //todo: add support sync for empty dir
+ CommitStep step = new DeleteFileCommitStep(targetFs, toDelete,
this.properties, Optional.<Path>absent());
+ copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(),
step, 1));
+ }
+ } catch (JsonIOException| JsonSyntaxException e) {
+ //todo: update error message to point to a sample json file instead of
schema which is hard to understand
+ log.warn(String.format("Failed to read Manifest path %s on filesystem
%s, please make sure it's in correct json format with schema"
+ + " {type:array, items:{type: object, properties:{id:{type:String},
fileName:{type:String}, fileGroup:{type:String}, fileSizeInBytes:
{type:Long}}}}",
+ manifestPath.toString(), fs.getUri().toString()), e);
+ throw new IOException(e);
+ } catch (Exception e ) {
+ log.warn(String.format("Failed to process Manifest path %s on filesystem
%s, due to", manifestPath.toString(), fs.getUri().toString()), e);
+ throw new IOException(e);
+ }finally {
+ if(reader != null) {
+ reader.endArray();
+ reader.close();
+ }
+ }
+ return Collections.singleton(new FileSet.Builder<>(datasetURN(),
this).add(copyEntities).build()).iterator();
+ }
+
+ private static boolean shouldCopy(FileStatus fileInSource, FileStatus
fileInTarget) {
+ //todo: make the rule configurable
+ return fileInSource.getModificationTime() > fileInTarget
+ .getModificationTime();
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDatasetFinder.java
new file mode 100644
index 000000000..5e8f2a68d
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDatasetFinder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+public class ManifestBasedDatasetFinder implements
IterableDatasetFinder<ManifestBasedDataset> {
+
+ public static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX +
".manifestBased";
+ public static final String MANIFEST_LOCATION = CONFIG_PREFIX +
".manifest.location";
+ private final FileSystem fs;
+ private final List<Path> manifestLocations;
+ private final Properties properties;
+ public ManifestBasedDatasetFinder(final FileSystem fs, Properties
properties) {
+ Preconditions.checkArgument(properties.containsKey(MANIFEST_LOCATION),
"Manifest location key required in config. Please set " + MANIFEST_LOCATION);
+ this.fs = fs;
+ manifestLocations = new ArrayList<>();
+ this.properties = properties;
+
Splitter.on(',').trimResults().split(properties.getProperty(MANIFEST_LOCATION)).forEach(s
-> manifestLocations.add(new Path(s)));
+ }
+ @Override
+ public List<ManifestBasedDataset> findDatasets() throws IOException {
+ return manifestLocations.stream().map(p -> new ManifestBasedDataset(fs, p,
properties)).collect(Collectors.toList());
+ }
+
+ @Override
+ public Path commonDatasetRoot() {
+ return new Path("/");
+ }
+
+ @Override
+ public Iterator<ManifestBasedDataset> getDatasetsIterator() throws
IOException {
+ return manifestLocations.stream().map(p -> new ManifestBasedDataset(fs, p,
properties)).iterator();
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
new file mode 100644
index 000000000..95ab31c34
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
+import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class ManifestBasedDatasetFinderTest {
+ private FileSystem localFs;
+
+ public ManifestBasedDatasetFinderTest() throws IOException {
+ localFs = FileSystem.getLocal(new Configuration());
+ }
+
+ @Test
+ public void testFindDataset() throws IOException {
+
+ //Get manifest Path
+ String manifestPath =
getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath();
+
+ // Test manifestDatasetFinder
+ Properties props = new Properties();
+ props.setProperty("gobblin.copy.manifestBased.manifest.location",
manifestPath);
+ ManifestBasedDatasetFinder finder = new
ManifestBasedDatasetFinder(localFs, props);
+ List<ManifestBasedDataset> datasets = finder.findDatasets();
+ Assert.assertEquals(datasets.size(), 1);
+ }
+
+ @Test
+ public void testFindFiles() throws IOException, URISyntaxException {
+
+ //Get manifest Path
+ Path manifestPath = new
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
+
+ // Test manifestDatasetFinder
+ Properties props = new Properties();
+ props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+
+ try (FileSystem destFs = Mockito.mock(FileSystem.class); FileSystem
sourceFs = Mockito.mock(FileSystem.class)) {
+ URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
+ URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
+ Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+ Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
+
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(manifestPath));
+ Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
+
Mockito.when(sourceFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
+ Mockito.when(destFs.exists(any(Path.class))).thenReturn(false);
+ Mockito.doAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Path path = (Path)args[0];
+ return localFs.makeQualified(path);
+ }).when(sourceFs).makeQualified(any(Path.class));
+ Iterator<FileSet<CopyEntity>> fileSets =
+ new ManifestBasedDataset(sourceFs, manifestPath,
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs,
props).build());
+ Assert.assertTrue(fileSets.hasNext());
+ FileSet<CopyEntity> fileSet = fileSets.next();
+ Assert.assertEquals(fileSet.getFiles().size(), 2);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/sampleManifest.json
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/sampleManifest.json
new file mode 100644
index 000000000..fb8f0d736
--- /dev/null
+++
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/sampleManifest.json
@@ -0,0 +1,14 @@
+[
+ {
+ "id":"1",
+ "fileName":"/tmp/dataset/test1.txt",
+ "fileGroup":"/tmp/dataset",
+ "fileSizeInBytes":"1024"
+ },
+ {
+ "id":"2",
+ "fileName":"/tmp/dataset/test2.txt",
+ "fileGroup":"/tmp/dataset1",
+ "fileSizeInBytes":"1028"
+ }
+]
\ No newline at end of file
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 28a9114cc..660cd7875 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -212,6 +212,8 @@ public class PathUtils {
log.info("Deleted empty directory " + startPath);
}
deleteEmptyParentDirectories(fs, limitPath, startPath.getParent());
+ } else {
+ log.info(String.format("%s is not ancestor of %s, will not delete %s in
this case", limitPath, startPath, startPath));
}
}