htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding 
utility functions to generate Avro/ORC binary using json
URL: https://github.com/apache/incubator-gobblin/pull/2566#discussion_r263075815
 
 

 ##########
 File path: 
gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java
 ##########
 @@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.binary_creation;
+
+import com.google.common.collect.AbstractIterator;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.experimental.Delegate;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.FsInput;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.filters.HiddenFilter;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.reflections.Reflections;
+import org.reflections.scanners.ResourcesScanner;
+import org.reflections.util.ConfigurationBuilder;
+
+
+/**
+ * A implementation of {@link DataTestTools} for Avro format.
+ */
+@Slf4j
+public class AvroTestTools extends DataTestTools<AvroTestTools.RecordIterator, 
Schema> {
+
+  public boolean checkSameFilesAndRecords(TreeMap<String, RecordIterator> 
expected,
+      TreeMap<String, RecordIterator> observed, boolean allowDifferentOrder, 
Collection<String> blacklistRecordFields,
+      boolean allowDifferentSchema) {
+    Iterator<String> keys1 = expected.navigableKeySet().iterator();
+    Iterator<String> keys2 = observed.navigableKeySet().iterator();
+
+    return compareIterators(keys1, keys2, (key1, key2) -> {
+      if (!removeExtension(key1).equals(removeExtension(key2))) {
+        log.error(String.format("Mismatched files: %s and %s", key1, key2));
+        return false;
+      }
+
+      RecordIterator it1 = expected.get(key1);
+      RecordIterator it2 = observed.get(key2);
+
+      if (!allowDifferentSchema && !it1.getSchema().equals(it2.getSchema())) {
+        log.error(String.format("Mismatched schemas: %s and %s", key1, key2));
+        return false;
+      }
+
+      if (allowDifferentOrder) {
+        Set r1 = allowDifferentSchema
+            ? toSetWithBlacklistedFields(it1, blacklistRecordFields, 
GenericRecordWrapper::new)
+            : toSetWithBlacklistedFields(it1, blacklistRecordFields, 
Function.identity());
+        Set r2 = allowDifferentSchema
+            ? toSetWithBlacklistedFields(it2, blacklistRecordFields, 
GenericRecordWrapper::new)
+            : toSetWithBlacklistedFields(it2, blacklistRecordFields, 
Function.identity());
+        if (r1.equals(r2)) {
+          return true;
+        } else {
+          log.info("Sets of records differ.");
+          return false;
+        }
+      } else {
+        return compareIterators(it1, it2, (r1, r2) -> {
+          if (blacklistRecordFields != null) {
+            for (String blacklisted : blacklistRecordFields) {
+              r1.put(blacklisted, null);
+              r2.put(blacklisted, null);
+            }
+          }
+          return allowDifferentSchema ?
+              GenericRecordWrapper.compareGenericRecordRegardlessOfSchema(r1, 
r2) : r1.equals(r2);
+        });
+      }
+    });
+  }
+
+  private static <T> Set<T> toSetWithBlacklistedFields(Iterator<GenericRecord> 
it,
+      Collection<String> blacklistRecordFields, Function<GenericRecord, T> 
transform) {
+    return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), 
false).map(r -> {
+      for (String blacklisted : blacklistRecordFields) {
+        r.put(blacklisted, null);
+      }
+      return transform.apply(r);
+    }).collect(Collectors.toSet());
+  }
+
+  static <T> boolean compareIterators(Iterator<T> expected, Iterator<T> 
observed, BiFunction<T, T, Boolean> comparator) {
+    while (expected.hasNext()) {
+      if (!observed.hasNext()) {
+        log.error("Expected has more elements than observed.");
+        return false;
+      }
+
+      T t1 = expected.next();
+      T t2 = observed.next();
+
+      boolean equals = comparator == null ? t1.equals(t2) : 
comparator.apply(t1, t2);
+
+      if (!equals) {
+        log.error(String.format("Mismatch: %s does not equal %s.", t1, t2));
+        return false;
+      }
+    }
+
+    if (observed.hasNext()) {
+      log.error("Observed has more elements than expected.");
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Read all avro records in an HDFS location into a map from file name to 
{@link RecordIterator}.
+   */
+  @Override
+  public TreeMap<String, RecordIterator> 
readAllRecordsInBinaryDirectory(FileSystem fs, Path path)
+      throws IOException {
+    TreeMap<String, RecordIterator> output = new TreeMap<>();
+    if (!fs.exists(path)) {
+      return output;
+    }
+    PathFilter pathFilter = new HiddenFilter();
+    for (FileStatus status : FileListUtils.listFilesRecursively(fs, path, 
pathFilter)) {
+      SeekableInput sin = new FsInput(status.getPath(), fs);
+      DataFileReader<GenericRecord> dfr = new DataFileReader<>(sin, new 
GenericDatumReader<>());
+
+      String key = PathUtils.relativizePath(status.getPath(), path).toString();
+
+      output.put(key, new RecordIterator(dfr.getSchema(), new 
AbstractIterator<GenericRecord>() {
+        @Override
+        protected GenericRecord computeNext() {
+          if (dfr.hasNext()) {
+            return dfr.next();
+          } else {
+            try {
+              dfr.close();
+            } catch (IOException ioe) {
+              log.error("Failed to close data file reader.", ioe);
+            }
+            endOfData();
+            return null;
+          }
+        }
+      }));
+    }
+    return output;
+  }
+
+  /**
+   * Read all avro records in a json base resource in classpath into a map 
from file name to {@link RecordIterator}.
+   */
+  @Override
+  public TreeMap<String, RecordIterator> readAllRecordsInJsonResource(String 
baseResource, @Nullable Schema schema)
+      throws IOException {
+    if (schema == null) {
+      String schemaResource = new File(baseResource, "schema.avsc").toString();
+      schema = readAvscSchema(schemaResource, AvroTestTools.class);
+    }
+
+    TreeMap<String, RecordIterator> output = new TreeMap<>();
+    for (String file : getJsonFileSetByResourceRootName(baseResource)) {
+      log.info("Reading json record from " + file);
+      String name = PathUtils.relativizePath(new Path(file), new 
Path(baseResource)).toString();
+
+      String schemaResourceName = new File(new File(file).getParent(), 
"schema.avsc").toString();
+      Schema thisSchema = readAvscSchema(schemaResourceName, 
AvroTestTools.class);
+      Schema actualSchema = thisSchema == null ? schema : thisSchema;
+
+      InputStream is = 
AvroTestTools.class.getClassLoader().getResourceAsStream(file);
+      output.put(name, readRecordsFromJsonInputStream(actualSchema, is, 
DecoderFactory.get().jsonDecoder(actualSchema, is)));
+    }
+    return output;
+  }
+
+  private static RecordIterator readRecordsFromJsonInputStream(Schema schema, 
InputStream is, Decoder decoder) {
+    GenericDatumReader<GenericRecord> reader = new 
GenericDatumReader<>(schema);
+
+    return new RecordIterator(schema, new AbstractIterator<GenericRecord>() {
+      @Override
+      protected GenericRecord computeNext() {
+        try {
+          return reader.read(null, decoder);
+        } catch (IOException ioe) {
+          try {
+            is.close();
+          } catch (IOException exc) {
+            log.warn("Failed to close input stream.", exc);
+          }
+          endOfData();
+          return null;
+        }
+      }
+    });
+  }
+
+  /**
+   * Materialize records in a classpath package into HDFS avro records.
+   * @param baseResource name of the package. The package should contain the 
following:
+   *                     - Exactly one resource called <name>.avsc containing 
the schema of the records
+   *                       (or an explicit schema passed as an argument).
+   *                     - One or more data files called *.json containing the 
records.
+   * @param fs the {@link FileSystem} where the records will be written.
+   * @param targetPath the path where the records will be written.
+   * @param schema Schema of the records, or null to read automatically from a 
resource.
+   * @throws IOException
+   */
+
+  public Schema writeJsonResourceRecordsAsBinary(String baseResource, 
FileSystem fs, Path targetPath,
+      @Nullable Schema schema) throws IOException {
+    TreeMap<String, RecordIterator> recordMap = 
readAllRecordsInJsonResource(baseResource, schema);
+
+    Schema outputSchema = recordMap.lastEntry().getValue().getSchema();
+
+    for (Map.Entry<String, RecordIterator> entry : recordMap.entrySet()) {
+      writeAsAvroBinary(entry.getValue(), entry.getValue().getSchema(), fs, 
new Path(targetPath,
+          removeExtension(entry.getKey()) + ".avro"));
+    }
+
+    return outputSchema;
+  }
+
+  /**
+   * Read schema from an avsc resource file.
+   */
+  public static Schema readAvscSchema(String resource, Class loadedClass) 
throws IOException {
+    InputStream is = 
loadedClass.getClassLoader().getResourceAsStream(resource);
 
 Review comment:
   Use try with resources.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to