This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 712770aa8 [GOBBLIN-1741] Create manifest based dataset finder (#3598)
712770aa8 is described below

commit 712770aa8bfab7075b062f30058c860d39ab4a14
Author: Zihan Li <[email protected]>
AuthorDate: Wed Nov 30 10:30:39 2022 -0800

    [GOBBLIN-1741] Create manifest based dataset finder (#3598)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1741] Create manifest based dataset finder
    
    * imporve logging
    
    * add unit test
    
    * Add unit test for manifest dataset to test jason parser
    
    * address comments
    
    * update log info
    
    * remove unused property and change comments
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../gobblin/data/management/copy/CopyableFile.java |   2 +-
 .../data/management/copy/ManifestBasedDataset.java | 138 +++++++++++++++++++++
 .../copy/ManifestBasedDatasetFinder.java           |  61 +++++++++
 .../dataset/ManifestBasedDatasetFinderTest.java    |  94 ++++++++++++++
 .../manifestBasedDistcpTest/sampleManifest.json    |  14 +++
 .../java/org/apache/gobblin/util/PathUtils.java    |   2 +
 6 files changed, 310 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index fd04fa333..6c7b77288 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -352,7 +352,7 @@ public class CopyableFile extends CopyEntity implements 
File {
     List<OwnerAndPermission> ownerAndPermissions = Lists.newArrayList();
     Path currentPath = fromPath;
 
-    while (PathUtils.isAncestor(toPath, currentPath.getParent())) {
+    while (currentPath.getParent() != null && PathUtils.isAncestor(toPath, 
currentPath.getParent())) {
       ownerAndPermissions.add(resolveReplicatedOwnerAndPermission(sourceFs, 
currentPath, copyConfiguration));
       currentPath = currentPath.getParent();
     }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
new file mode 100644
index 000000000..9b7a56fc9
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.stream.JsonReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.gobblin.util.commit.DeleteFileCommitStep;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A dataset that based on Manifest. We expect the Manifest contains the list 
of all the files for this dataset.
+ * At first phase, we only support copy across different clusters to the same 
location. (We can add more feature to support rename in the future)
+ * We will delete the file on target if it's listed in the manifest and not 
exist on source when {@link 
ManifestBasedDataset.DELETE_FILE_NOT_EXIST_ON_SOURCE} set to be true
+ */
+@Slf4j
+public class ManifestBasedDataset implements IterableCopyableDataset {
+
+  private static final String DELETE_FILE_NOT_EXIST_ON_SOURCE = 
ManifestBasedDatasetFinder.CONFIG_PREFIX + ".deleteFileNotExistOnSource";
+  private final FileSystem fs;
+  private final Path manifestPath;
+  private final Properties properties;
+  private final boolean deleteFileThatNotExistOnSource;
+  private Gson GSON = new Gson();
+
+  public ManifestBasedDataset(final FileSystem fs, Path manifestPath, 
Properties properties) {
+    this.fs = fs;
+    this.manifestPath = manifestPath;
+    this.properties = properties;
+    this.deleteFileThatNotExistOnSource = 
Boolean.parseBoolean(properties.getProperty(DELETE_FILE_NOT_EXIST_ON_SOURCE, 
"false"));
+  }
+
+  @Override
+  public String datasetURN() {
+    return this.manifestPath.toString();
+  }
+
+  @Override
+  public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration)
+      throws IOException {
+    if (!fs.exists(manifestPath)) {
+      throw new IOException(String.format("Manifest path %s does not exist on 
filesystem %s, skipping this manifest"
+          + ", probably due to wrong configuration of %s", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
+    } else if (fs.getFileStatus(manifestPath).isDirectory()) {
+      throw new IOException(String.format("Manifest path %s on filesystem %s 
is a directory, which is not supported. Please set the manifest file locations 
in"
+          + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
+    }
+    JsonReader reader = null;
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    List<FileStatus> toDelete = Lists.newArrayList();
+    //todo: put permission preserve logic here?
+    try {
+      reader = new JsonReader(new InputStreamReader(fs.open(manifestPath), 
"UTF-8"));
+      reader.beginArray();
+      while (reader.hasNext()) {
+        //todo: We can use fileSet to partition the data in case of some 
softbound issue
+        //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
+        JsonObject file = GSON.fromJson(reader, JsonObject.class);
+        Path fileToCopy = new Path(file.get("fileName").getAsString());
+        if (this.fs.exists(fileToCopy)) {
+          if (!targetFs.exists(fileToCopy) || 
shouldCopy(this.fs.getFileStatus(fileToCopy), 
targetFs.getFileStatus(fileToCopy))) {
+            CopyableFile copyableFile =
+                CopyableFile.fromOriginAndDestination(this.fs, 
this.fs.getFileStatus(fileToCopy), fileToCopy, configuration)
+                    .fileSet(datasetURN())
+                    .datasetOutputPath(fileToCopy.toString())
+                    .ancestorsOwnerAndPermission(CopyableFile
+                        
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, 
fileToCopy.getParent(),
+                            new Path("/"), configuration))
+                    .build();
+            copyableFile.setFsDatasets(this.fs, targetFs);
+            copyEntities.add(copyableFile);
+          }
+        } else if (this.deleteFileThatNotExistOnSource && 
targetFs.exists(fileToCopy)){
+          toDelete.add(targetFs.getFileStatus(fileToCopy));
+        }
+      }
+      if (this.deleteFileThatNotExistOnSource) {
+        //todo: add support sync for empty dir
+        CommitStep step = new DeleteFileCommitStep(targetFs, toDelete, 
this.properties, Optional.<Path>absent());
+        copyEntities.add(new PrePublishStep(datasetURN(), Maps.newHashMap(), 
step, 1));
+      }
+    } catch (JsonIOException| JsonSyntaxException e) {
+      //todo: update error message to point to a sample json file instead of 
schema which is hard to understand
+      log.warn(String.format("Failed to read Manifest path %s on filesystem 
%s, please make sure it's in correct json format with schema"
+          + " {type:array, items:{type: object, properties:{id:{type:String}, 
fileName:{type:String}, fileGroup:{type:String}, fileSizeInBytes: 
{type:Long}}}}",
+          manifestPath.toString(), fs.getUri().toString()), e);
+      throw new IOException(e);
+    } catch (Exception e ) {
+      log.warn(String.format("Failed to process Manifest path %s on filesystem 
%s, due to", manifestPath.toString(), fs.getUri().toString()), e);
+      throw new IOException(e);
+    }finally {
+      if(reader != null) {
+        reader.endArray();
+        reader.close();
+      }
+    }
+    return Collections.singleton(new FileSet.Builder<>(datasetURN(), 
this).add(copyEntities).build()).iterator();
+  }
+
+  private static boolean shouldCopy(FileStatus fileInSource, FileStatus 
fileInTarget) {
+    //todo: make the rule configurable
+    return fileInSource.getModificationTime() > fileInTarget
+        .getModificationTime();
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDatasetFinder.java
new file mode 100644
index 000000000..5e8f2a68d
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDatasetFinder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.gobblin.dataset.IterableDatasetFinder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+public class ManifestBasedDatasetFinder implements 
IterableDatasetFinder<ManifestBasedDataset> {
+
+  public static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + 
".manifestBased";
+  public static final String MANIFEST_LOCATION = CONFIG_PREFIX + 
".manifest.location";
+  private final FileSystem fs;
+  private final List<Path> manifestLocations;
+  private final  Properties properties;
+  public ManifestBasedDatasetFinder(final FileSystem fs, Properties 
properties) {
+    Preconditions.checkArgument(properties.containsKey(MANIFEST_LOCATION), 
"Manifest location key required in config. Please set " + MANIFEST_LOCATION);
+    this.fs = fs;
+    manifestLocations = new ArrayList<>();
+    this.properties = properties;
+    
Splitter.on(',').trimResults().split(properties.getProperty(MANIFEST_LOCATION)).forEach(s
 -> manifestLocations.add(new Path(s)));
+  }
+  @Override
+  public List<ManifestBasedDataset> findDatasets() throws IOException {
+    return manifestLocations.stream().map(p -> new ManifestBasedDataset(fs, p, 
properties)).collect(Collectors.toList());
+  }
+
+  @Override
+  public Path commonDatasetRoot() {
+    return new Path("/");
+  }
+
+  @Override
+  public Iterator<ManifestBasedDataset> getDatasetsIterator() throws 
IOException {
+    return manifestLocations.stream().map(p -> new ManifestBasedDataset(fs, p, 
properties)).iterator();
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
new file mode 100644
index 000000000..95ab31c34
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/ManifestBasedDatasetFinderTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.ManifestBasedDataset;
+import org.apache.gobblin.data.management.copy.ManifestBasedDatasetFinder;
+import org.apache.gobblin.data.management.partition.FileSet;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.*;
+
+
+public class ManifestBasedDatasetFinderTest {
+  private FileSystem localFs;
+
+  public ManifestBasedDatasetFinderTest() throws IOException {
+    localFs = FileSystem.getLocal(new Configuration());
+  }
+
+  @Test
+  public void testFindDataset() throws IOException {
+
+    //Get manifest Path
+    String manifestPath = 
getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath();
+
+    // Test manifestDatasetFinder
+    Properties props = new Properties();
+    props.setProperty("gobblin.copy.manifestBased.manifest.location", 
manifestPath);
+    ManifestBasedDatasetFinder finder = new 
ManifestBasedDatasetFinder(localFs, props);
+    List<ManifestBasedDataset> datasets = finder.findDatasets();
+    Assert.assertEquals(datasets.size(), 1);
+  }
+
+  @Test
+  public void testFindFiles() throws IOException, URISyntaxException {
+
+    //Get manifest Path
+    Path manifestPath = new 
Path(getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath());
+
+    // Test manifestDatasetFinder
+    Properties props = new Properties();
+    props.setProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/");
+
+    try (FileSystem destFs = Mockito.mock(FileSystem.class); FileSystem 
sourceFs = Mockito.mock(FileSystem.class)) {
+      URI SRC_FS_URI = new URI("source", "the.source.org", "/", null);
+      URI DEST_FS_URI = new URI("dest", "the.dest.org", "/", null);
+      Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+      Mockito.when(destFs.getUri()).thenReturn(DEST_FS_URI);
+      
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenReturn(localFs.getFileStatus(manifestPath));
+      Mockito.when(sourceFs.exists(any(Path.class))).thenReturn(true);
+      
Mockito.when(sourceFs.open(manifestPath)).thenReturn(localFs.open(manifestPath));
+      Mockito.when(destFs.exists(any(Path.class))).thenReturn(false);
+      Mockito.doAnswer(invocation -> {
+        Object[] args = invocation.getArguments();
+        Path path = (Path)args[0];
+        return localFs.makeQualified(path);
+      }).when(sourceFs).makeQualified(any(Path.class));
+      Iterator<FileSet<CopyEntity>> fileSets =
+          new ManifestBasedDataset(sourceFs, manifestPath, 
props).getFileSetIterator(destFs, CopyConfiguration.builder(destFs, 
props).build());
+      Assert.assertTrue(fileSets.hasNext());
+      FileSet<CopyEntity> fileSet = fileSets.next();
+      Assert.assertEquals(fileSet.getFiles().size(), 2);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/sampleManifest.json
 
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/sampleManifest.json
new file mode 100644
index 000000000..fb8f0d736
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/sampleManifest.json
@@ -0,0 +1,14 @@
+[
+  {
+    "id":"1",
+    "fileName":"/tmp/dataset/test1.txt",
+    "fileGroup":"/tmp/dataset",
+    "fileSizeInBytes":"1024"
+  },
+  {
+    "id":"2",
+    "fileName":"/tmp/dataset/test2.txt",
+    "fileGroup":"/tmp/dataset1",
+    "fileSizeInBytes":"1028"
+  }
+]
\ No newline at end of file
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
index 28a9114cc..660cd7875 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PathUtils.java
@@ -212,6 +212,8 @@ public class PathUtils {
         log.info("Deleted empty directory " + startPath);
       }
       deleteEmptyParentDirectories(fs, limitPath, startPath.getParent());
+    } else {
+      log.info(String.format("%s is not ancestor of %s, will not delete %s in 
this case", limitPath, startPath, startPath));
     }
   }
 

Reply via email to