This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5598efe00 [GOBBLIN-1757]Refactor manifest, add reader/writer and 
iterator for efficient reading (#3618)
5598efe00 is described below

commit 5598efe0005cd128d5a57ec20bea2fa317f25b08
Author: Tom <[email protected]>
AuthorDate: Tue Dec 20 15:31:43 2022 -0800

    [GOBBLIN-1757]Refactor manifest, add reader/writer and iterator for 
efficient reading (#3618)
    
    * Refactor manifest, add reader/writer and iterator for effecient reading
    
    * Fix PR comments
    
    * Add apache copyright
    
    * add basic validation of copy manifest schema for the one required field 
and improve comments/docs
    
    * Address PR comments, trying write in a try/finally
    
    Co-authored-by: Tom McCormick <[email protected]>
---
 .../gobblin/data/management/copy/CopyManifest.java | 155 +++++++++++++++++++++
 .../data/management/copy/ManifestBasedDataset.java |  21 ++-
 .../data/management/dataset/TestCopyManifest.java  |  97 +++++++++++++
 .../missingFileNameManifest.json                   |   7 +
 4 files changed, 267 insertions(+), 13 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyManifest.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyManifest.java
new file mode 100644
index 000000000..133d34eab
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyManifest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.stream.JsonReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * Copy Manifest schema and serDe for manifest based copy
+ * 
https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook
+ */
+public class CopyManifest {
+  private static final String MISSING_FN_MESSAGE = "fileName cannot be null";
+  private static final Gson GSON = new 
GsonBuilder().setPrettyPrinting().create();
+  private static final Type CopyableUnitListType = new 
TypeToken<ArrayList<CopyableUnit>>(){}.getType();
+
+  public final ArrayList<CopyableUnit> _copyableUnits;
+
+  public CopyManifest() {
+    _copyableUnits = new ArrayList<>();
+  }
+
+  public CopyManifest(ArrayList<CopyableUnit> copyableUnits) {
+    _copyableUnits = copyableUnits;
+  }
+
+  /**
+   * Add a new copyable unit to a copy manifest. Used for building a manifest
+   * @param copyableUnit
+   */
+  public void add(CopyManifest.CopyableUnit copyableUnit) {
+    _copyableUnits.add(copyableUnit);
+  }
+
+  /**
+   * One item in a copy manifest
+   * Only filename is required
+   */
+  public static class CopyableUnit {
+    public final String fileName;
+    public final String fileGroup;
+    public final Long fileSizeInBytes;
+    public final Long fileModificationTime;
+
+    public CopyableUnit(String fileName, String fileGroup, Long 
fileSizeInBytes, Long fileModificationTime) {
+      this.fileName = fileName;
+      this.fileGroup = fileGroup;
+      this.fileSizeInBytes = fileSizeInBytes;
+      this.fileModificationTime = fileModificationTime;
+      if (this.fileName == null) {
+        throw new IllegalArgumentException(MISSING_FN_MESSAGE);
+      }
+    }
+  }
+
+  /**
+   * Note: naive read does not do validation of schema. For schema validation 
use CopyableUnitIterator
+   * @param fs filsystem object used for accessing the filesystem
+   * @param path path manifest file location
+   * @return a copy manifest object from the json representation at path
+   * @throws IOException
+   */
+  public static CopyManifest read(FileSystem fs, Path path) throws IOException 
{
+    JsonReader jsonReader = new JsonReader(new 
InputStreamReader(fs.open(path), "UTF-8"));
+    return new CopyManifest(GSON.fromJson(jsonReader, CopyableUnitListType));
+  }
+
+  /**
+   *
+   * @param fs filsystem object used for accessing the filesystem
+   * @param path path manifest file location
+   * @throws IOException
+   */
+  public void write(FileSystem fs, Path path) throws IOException {
+    FSDataOutputStream out = null;
+    try {
+      String outputJson = GSON.toJson(this._copyableUnits, 
CopyableUnitListType);
+      out = fs.create(path, true);
+      out.write(outputJson.getBytes(StandardCharsets.UTF_8));
+    } finally {
+      if (out != null) {
+        out.close();
+      }
+    }
+  }
+
+  public static CopyableUnitIterator getReadIterator(FileSystem fs, Path path) 
throws IOException {
+    return new CopyableUnitIterator(fs, path);
+  }
+
+  /**
+   * An iterator for CopyManifest for more efficient reading
+   */
+  public static class CopyableUnitIterator implements Iterator {
+    JsonReader reader;
+
+    public CopyableUnitIterator(FileSystem fs, Path path) throws IOException {
+      reader = new JsonReader(new InputStreamReader(fs.open(path), "UTF-8"));
+      reader.beginArray();
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        return reader.hasNext();
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      return false;
+    }
+
+    @Override
+    public CopyManifest.CopyableUnit next() {
+      CopyManifest.CopyableUnit copyableUnit = GSON.fromJson(reader, 
CopyManifest.CopyableUnit.class);
+      if (copyableUnit.fileName == null) {
+        throw new IllegalArgumentException(MISSING_FN_MESSAGE);
+      }
+      return copyableUnit;
+    }
+
+    public void close() throws IOException {
+      if (reader != null) {
+        reader.endArray();
+        reader.close();
+      }
+    }
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 9b7a56fc9..cf836591b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -22,11 +22,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
 import com.google.gson.JsonIOException;
-import com.google.gson.JsonObject;
 import com.google.gson.JsonSyntaxException;
-import com.google.gson.stream.JsonReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -78,18 +75,17 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
       throw new IOException(String.format("Manifest path %s on filesystem %s 
is a directory, which is not supported. Please set the manifest file locations 
in"
           + "%s, you can specify multi locations split by '',", 
manifestPath.toString(), fs.getUri().toString(), 
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
     }
-    JsonReader reader = null;
+    CopyManifest.CopyableUnitIterator manifests = null;
     List<CopyEntity> copyEntities = Lists.newArrayList();
     List<FileStatus> toDelete = Lists.newArrayList();
     //todo: put permission preserve logic here?
     try {
-      reader = new JsonReader(new InputStreamReader(fs.open(manifestPath), 
"UTF-8"));
-      reader.beginArray();
-      while (reader.hasNext()) {
+      manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+      while (manifests.hasNext()) {
         //todo: We can use fileSet to partition the data in case of some 
softbound issue
         //todo: After partition, change this to directly return iterator so 
that we can save time if we meet resources limitation
-        JsonObject file = GSON.fromJson(reader, JsonObject.class);
-        Path fileToCopy = new Path(file.get("fileName").getAsString());
+        CopyManifest.CopyableUnit file = manifests.next();
+        Path fileToCopy = new Path(file.fileName);
         if (this.fs.exists(fileToCopy)) {
           if (!targetFs.exists(fileToCopy) || 
shouldCopy(this.fs.getFileStatus(fileToCopy), 
targetFs.getFileStatus(fileToCopy))) {
             CopyableFile copyableFile =
@@ -121,10 +117,9 @@ public class ManifestBasedDataset implements 
IterableCopyableDataset {
     } catch (Exception e ) {
       log.warn(String.format("Failed to process Manifest path %s on filesystem 
%s, due to", manifestPath.toString(), fs.getUri().toString()), e);
       throw new IOException(e);
-    }finally {
-      if(reader != null) {
-        reader.endArray();
-        reader.close();
+    } finally {
+      if (manifests != null) {
+        manifests.close();
       }
     }
     return Collections.singleton(new FileSet.Builder<>(datasetURN(), 
this).add(copyEntities).build()).iterator();
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestCopyManifest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestCopyManifest.java
new file mode 100644
index 000000000..b8efc8a3d
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestCopyManifest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import org.apache.gobblin.data.management.copy.CopyManifest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestCopyManifest {
+  private FileSystem localFs;
+
+
+  public TestCopyManifest() throws IOException {
+    localFs = FileSystem.getLocal(new Configuration());
+  }
+
+  @Test
+  public void manifestSanityRead() throws IOException {
+    //Get manifest Path
+    String manifestPath =
+        
getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath();
+
+    CopyManifest manifest = CopyManifest.read(localFs, new Path(manifestPath));
+    Assert.assertEquals(manifest._copyableUnits.size(), 2);
+    CopyManifest.CopyableUnit cu = manifest._copyableUnits.get(0);
+    Assert.assertEquals(cu.fileName, "/tmp/dataset/test1.txt");
+  }
+
+  @Test
+  public void manifestSanityWrite() throws IOException {
+    File tmpDir = Files.createTempDir();
+    Path output = new Path(tmpDir.getAbsolutePath(), "test");
+    CopyManifest manifest = new CopyManifest();
+    manifest.add(new CopyManifest.CopyableUnit("testfilename", null, null, 
null));
+    manifest.write(localFs, output);
+
+    CopyManifest readManifest = CopyManifest.read(localFs, output);
+    Assert.assertEquals(readManifest._copyableUnits.size(), 1);
+    Assert.assertEquals(readManifest._copyableUnits.get(0).fileName, 
"testfilename");
+  }
+
+  @Test
+  public void manifestSanityReadIterator() throws IOException {
+    //Get manifest Path
+    String manifestPath =
+        
getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath();
+
+    CopyManifest manifest = CopyManifest.read(localFs, new Path(manifestPath));
+
+    CopyManifest.CopyableUnitIterator manifestIterator = 
CopyManifest.getReadIterator(localFs, new Path(manifestPath));
+    int count = 0;
+    while (manifestIterator.hasNext()) {
+      CopyManifest.CopyableUnit cu = manifestIterator.next();
+      Assert.assertEquals(cu.fileName, 
manifest._copyableUnits.get(count).fileName);
+      Assert.assertEquals(cu.fileGroup, 
manifest._copyableUnits.get(count).fileGroup);
+      count++;
+    }
+    Assert.assertEquals(count, 2);
+    Assert.assertEquals(count, manifest._copyableUnits.size());
+    manifestIterator.close();
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void invalidCopyableUnit() {
+    new CopyManifest.CopyableUnit(null, null, null, null);
+  }
+
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void invalidReadIteratorCopyManifest() throws IOException {
+    String manifestPath =
+        
getClass().getClassLoader().getResource("manifestBasedDistcpTest/missingFileNameManifest.json").getPath();
+    CopyManifest.CopyableUnitIterator manifestIterator = 
CopyManifest.getReadIterator(localFs, new Path(manifestPath));
+    manifestIterator.next();
+  }
+}
diff --git 
a/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/missingFileNameManifest.json
 
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/missingFileNameManifest.json
new file mode 100644
index 000000000..1b939a418
--- /dev/null
+++ 
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/missingFileNameManifest.json
@@ -0,0 +1,7 @@
+[
+  {
+    "id":"1",
+    "fileGroup":"/tmp/dataset",
+    "fileSizeInBytes":"1024"
+  }
+]
\ No newline at end of file

Reply via email to