This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5598efe00 [GOBBLIN-1757]Refactor manifest, add reader/writer and
iterator for efficient reading (#3618)
5598efe00 is described below
commit 5598efe0005cd128d5a57ec20bea2fa317f25b08
Author: Tom <[email protected]>
AuthorDate: Tue Dec 20 15:31:43 2022 -0800
[GOBBLIN-1757]Refactor manifest, add reader/writer and iterator for
efficient reading (#3618)
* Refactor manifest, add reader/writer and iterator for effecient reading
* Fix PR comments
* Add apache copyright
* add basic validation of copy manifest schema for the one required field
and improve comments/docs
* Address PR comments, trying write in a try/finally
Co-authored-by: Tom McCormick <[email protected]>
---
.../gobblin/data/management/copy/CopyManifest.java | 155 +++++++++++++++++++++
.../data/management/copy/ManifestBasedDataset.java | 21 ++-
.../data/management/dataset/TestCopyManifest.java | 97 +++++++++++++
.../missingFileNameManifest.json | 7 +
4 files changed, 267 insertions(+), 13 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyManifest.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyManifest.java
new file mode 100644
index 000000000..133d34eab
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyManifest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.stream.JsonReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * Copy Manifest schema and serDe for manifest based copy
+ *
https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Manifest+based+distcp+runbook
+ */
+public class CopyManifest {
+ private static final String MISSING_FN_MESSAGE = "fileName cannot be null";
+ private static final Gson GSON = new
GsonBuilder().setPrettyPrinting().create();
+ private static final Type CopyableUnitListType = new
TypeToken<ArrayList<CopyableUnit>>(){}.getType();
+
+ public final ArrayList<CopyableUnit> _copyableUnits;
+
+ public CopyManifest() {
+ _copyableUnits = new ArrayList<>();
+ }
+
+ public CopyManifest(ArrayList<CopyableUnit> copyableUnits) {
+ _copyableUnits = copyableUnits;
+ }
+
+ /**
+ * Add a new copyable unit to a copy manifest. Used for building a manifest
+ * @param copyableUnit
+ */
+ public void add(CopyManifest.CopyableUnit copyableUnit) {
+ _copyableUnits.add(copyableUnit);
+ }
+
+ /**
+ * One item in a copy manifest
+ * Only filename is required
+ */
+ public static class CopyableUnit {
+ public final String fileName;
+ public final String fileGroup;
+ public final Long fileSizeInBytes;
+ public final Long fileModificationTime;
+
+ public CopyableUnit(String fileName, String fileGroup, Long
fileSizeInBytes, Long fileModificationTime) {
+ this.fileName = fileName;
+ this.fileGroup = fileGroup;
+ this.fileSizeInBytes = fileSizeInBytes;
+ this.fileModificationTime = fileModificationTime;
+ if (this.fileName == null) {
+ throw new IllegalArgumentException(MISSING_FN_MESSAGE);
+ }
+ }
+ }
+
+ /**
+ * Note: naive read does not do validation of schema. For schema validation
use CopyableUnitIterator
+ * @param fs filsystem object used for accessing the filesystem
+ * @param path path manifest file location
+ * @return a copy manifest object from the json representation at path
+ * @throws IOException
+ */
+ public static CopyManifest read(FileSystem fs, Path path) throws IOException
{
+ JsonReader jsonReader = new JsonReader(new
InputStreamReader(fs.open(path), "UTF-8"));
+ return new CopyManifest(GSON.fromJson(jsonReader, CopyableUnitListType));
+ }
+
+ /**
+ *
+ * @param fs filsystem object used for accessing the filesystem
+ * @param path path manifest file location
+ * @throws IOException
+ */
+ public void write(FileSystem fs, Path path) throws IOException {
+ FSDataOutputStream out = null;
+ try {
+ String outputJson = GSON.toJson(this._copyableUnits,
CopyableUnitListType);
+ out = fs.create(path, true);
+ out.write(outputJson.getBytes(StandardCharsets.UTF_8));
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ public static CopyableUnitIterator getReadIterator(FileSystem fs, Path path)
throws IOException {
+ return new CopyableUnitIterator(fs, path);
+ }
+
+ /**
+ * An iterator for CopyManifest for more efficient reading
+ */
+ public static class CopyableUnitIterator implements Iterator {
+ JsonReader reader;
+
+ public CopyableUnitIterator(FileSystem fs, Path path) throws IOException {
+ reader = new JsonReader(new InputStreamReader(fs.open(path), "UTF-8"));
+ reader.beginArray();
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return reader.hasNext();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ @Override
+ public CopyManifest.CopyableUnit next() {
+ CopyManifest.CopyableUnit copyableUnit = GSON.fromJson(reader,
CopyManifest.CopyableUnit.class);
+ if (copyableUnit.fileName == null) {
+ throw new IllegalArgumentException(MISSING_FN_MESSAGE);
+ }
+ return copyableUnit;
+ }
+
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.endArray();
+ reader.close();
+ }
+ }
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
index 9b7a56fc9..cf836591b 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/ManifestBasedDataset.java
@@ -22,11 +22,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonIOException;
-import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
-import com.google.gson.stream.JsonReader;
import java.io.IOException;
-import java.io.InputStreamReader;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -78,18 +75,17 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
throw new IOException(String.format("Manifest path %s on filesystem %s
is a directory, which is not supported. Please set the manifest file locations
in"
+ "%s, you can specify multi locations split by '',",
manifestPath.toString(), fs.getUri().toString(),
ManifestBasedDatasetFinder.MANIFEST_LOCATION));
}
- JsonReader reader = null;
+ CopyManifest.CopyableUnitIterator manifests = null;
List<CopyEntity> copyEntities = Lists.newArrayList();
List<FileStatus> toDelete = Lists.newArrayList();
//todo: put permission preserve logic here?
try {
- reader = new JsonReader(new InputStreamReader(fs.open(manifestPath),
"UTF-8"));
- reader.beginArray();
- while (reader.hasNext()) {
+ manifests = CopyManifest.getReadIterator(this.fs, this.manifestPath);
+ while (manifests.hasNext()) {
//todo: We can use fileSet to partition the data in case of some
softbound issue
//todo: After partition, change this to directly return iterator so
that we can save time if we meet resources limitation
- JsonObject file = GSON.fromJson(reader, JsonObject.class);
- Path fileToCopy = new Path(file.get("fileName").getAsString());
+ CopyManifest.CopyableUnit file = manifests.next();
+ Path fileToCopy = new Path(file.fileName);
if (this.fs.exists(fileToCopy)) {
if (!targetFs.exists(fileToCopy) ||
shouldCopy(this.fs.getFileStatus(fileToCopy),
targetFs.getFileStatus(fileToCopy))) {
CopyableFile copyableFile =
@@ -121,10 +117,9 @@ public class ManifestBasedDataset implements
IterableCopyableDataset {
} catch (Exception e ) {
log.warn(String.format("Failed to process Manifest path %s on filesystem
%s, due to", manifestPath.toString(), fs.getUri().toString()), e);
throw new IOException(e);
- }finally {
- if(reader != null) {
- reader.endArray();
- reader.close();
+ } finally {
+ if (manifests != null) {
+ manifests.close();
}
}
return Collections.singleton(new FileSet.Builder<>(datasetURN(),
this).add(copyEntities).build()).iterator();
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestCopyManifest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestCopyManifest.java
new file mode 100644
index 000000000..b8efc8a3d
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/dataset/TestCopyManifest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.dataset;
+
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import org.apache.gobblin.data.management.copy.CopyManifest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestCopyManifest {
+ private FileSystem localFs;
+
+
+ public TestCopyManifest() throws IOException {
+ localFs = FileSystem.getLocal(new Configuration());
+ }
+
+ @Test
+ public void manifestSanityRead() throws IOException {
+ //Get manifest Path
+ String manifestPath =
+
getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath();
+
+ CopyManifest manifest = CopyManifest.read(localFs, new Path(manifestPath));
+ Assert.assertEquals(manifest._copyableUnits.size(), 2);
+ CopyManifest.CopyableUnit cu = manifest._copyableUnits.get(0);
+ Assert.assertEquals(cu.fileName, "/tmp/dataset/test1.txt");
+ }
+
+ @Test
+ public void manifestSanityWrite() throws IOException {
+ File tmpDir = Files.createTempDir();
+ Path output = new Path(tmpDir.getAbsolutePath(), "test");
+ CopyManifest manifest = new CopyManifest();
+ manifest.add(new CopyManifest.CopyableUnit("testfilename", null, null,
null));
+ manifest.write(localFs, output);
+
+ CopyManifest readManifest = CopyManifest.read(localFs, output);
+ Assert.assertEquals(readManifest._copyableUnits.size(), 1);
+ Assert.assertEquals(readManifest._copyableUnits.get(0).fileName,
"testfilename");
+ }
+
+ @Test
+ public void manifestSanityReadIterator() throws IOException {
+ //Get manifest Path
+ String manifestPath =
+
getClass().getClassLoader().getResource("manifestBasedDistcpTest/sampleManifest.json").getPath();
+
+ CopyManifest manifest = CopyManifest.read(localFs, new Path(manifestPath));
+
+ CopyManifest.CopyableUnitIterator manifestIterator =
CopyManifest.getReadIterator(localFs, new Path(manifestPath));
+ int count = 0;
+ while (manifestIterator.hasNext()) {
+ CopyManifest.CopyableUnit cu = manifestIterator.next();
+ Assert.assertEquals(cu.fileName,
manifest._copyableUnits.get(count).fileName);
+ Assert.assertEquals(cu.fileGroup,
manifest._copyableUnits.get(count).fileGroup);
+ count++;
+ }
+ Assert.assertEquals(count, 2);
+ Assert.assertEquals(count, manifest._copyableUnits.size());
+ manifestIterator.close();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void invalidCopyableUnit() {
+ new CopyManifest.CopyableUnit(null, null, null, null);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void invalidReadIteratorCopyManifest() throws IOException {
+ String manifestPath =
+
getClass().getClassLoader().getResource("manifestBasedDistcpTest/missingFileNameManifest.json").getPath();
+ CopyManifest.CopyableUnitIterator manifestIterator =
CopyManifest.getReadIterator(localFs, new Path(manifestPath));
+ manifestIterator.next();
+ }
+}
diff --git
a/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/missingFileNameManifest.json
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/missingFileNameManifest.json
new file mode 100644
index 000000000..1b939a418
--- /dev/null
+++
b/gobblin-data-management/src/test/resources/manifestBasedDistcpTest/missingFileNameManifest.json
@@ -0,0 +1,7 @@
+[
+ {
+ "id":"1",
+ "fileGroup":"/tmp/dataset",
+ "fileSizeInBytes":"1024"
+ }
+]
\ No newline at end of file