htran1 commented on a change in pull request #2566: [GOBBLIN-695] Adding utility functions to generate Avro/ORC binary using json URL: https://github.com/apache/incubator-gobblin/pull/2566#discussion_r263076168
########## File path: gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/AvroTestTools.java ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.binary_creation; + +import com.google.common.collect.AbstractIterator; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeMap; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.experimental.Delegate; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.mapred.FsInput; +import org.apache.gobblin.util.FileListUtils; +import org.apache.gobblin.util.PathUtils; +import org.apache.gobblin.util.filters.HiddenFilter; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.reflections.Reflections; +import org.reflections.scanners.ResourcesScanner; +import org.reflections.util.ConfigurationBuilder; + + +/** + * A implementation of {@link DataTestTools} for Avro format. + */ +@Slf4j +public class AvroTestTools extends DataTestTools<AvroTestTools.RecordIterator, Schema> { + + public boolean checkSameFilesAndRecords(TreeMap<String, RecordIterator> expected, + TreeMap<String, RecordIterator> observed, boolean allowDifferentOrder, Collection<String> blacklistRecordFields, + boolean allowDifferentSchema) { + Iterator<String> keys1 = expected.navigableKeySet().iterator(); + Iterator<String> keys2 = observed.navigableKeySet().iterator(); + + return compareIterators(keys1, keys2, (key1, key2) -> { + if (!removeExtension(key1).equals(removeExtension(key2))) { + log.error(String.format("Mismatched files: %s and %s", key1, key2)); + return false; + } + + RecordIterator it1 = expected.get(key1); + RecordIterator it2 = observed.get(key2); + + if (!allowDifferentSchema && !it1.getSchema().equals(it2.getSchema())) { + log.error(String.format("Mismatched schemas: %s and %s", key1, key2)); + return false; + } + + if (allowDifferentOrder) { + Set r1 = allowDifferentSchema + ? toSetWithBlacklistedFields(it1, blacklistRecordFields, GenericRecordWrapper::new) + : toSetWithBlacklistedFields(it1, blacklistRecordFields, Function.identity()); + Set r2 = allowDifferentSchema + ? toSetWithBlacklistedFields(it2, blacklistRecordFields, GenericRecordWrapper::new) + : toSetWithBlacklistedFields(it2, blacklistRecordFields, Function.identity()); + if (r1.equals(r2)) { + return true; + } else { + log.info("Sets of records differ."); + return false; + } + } else { + return compareIterators(it1, it2, (r1, r2) -> { + if (blacklistRecordFields != null) { + for (String blacklisted : blacklistRecordFields) { + r1.put(blacklisted, null); + r2.put(blacklisted, null); + } + } + return allowDifferentSchema ? + GenericRecordWrapper.compareGenericRecordRegardlessOfSchema(r1, r2) : r1.equals(r2); + }); + } + }); + } + + private static <T> Set<T> toSetWithBlacklistedFields(Iterator<GenericRecord> it, + Collection<String> blacklistRecordFields, Function<GenericRecord, T> transform) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(it, 0), false).map(r -> { + for (String blacklisted : blacklistRecordFields) { + r.put(blacklisted, null); + } + return transform.apply(r); + }).collect(Collectors.toSet()); + } + + static <T> boolean compareIterators(Iterator<T> expected, Iterator<T> observed, BiFunction<T, T, Boolean> comparator) { + while (expected.hasNext()) { + if (!observed.hasNext()) { + log.error("Expected has more elements than observed."); + return false; + } + + T t1 = expected.next(); + T t2 = observed.next(); + + boolean equals = comparator == null ? t1.equals(t2) : comparator.apply(t1, t2); + + if (!equals) { + log.error(String.format("Mismatch: %s does not equal %s.", t1, t2)); + return false; + } + } + + if (observed.hasNext()) { + log.error("Observed has more elements than expected."); + return false; + } + + return true; + } + + /** + * Read all avro records in an HDFS location into a map from file name to {@link RecordIterator}. + */ + @Override + public TreeMap<String, RecordIterator> readAllRecordsInBinaryDirectory(FileSystem fs, Path path) + throws IOException { + TreeMap<String, RecordIterator> output = new TreeMap<>(); + if (!fs.exists(path)) { + return output; + } + PathFilter pathFilter = new HiddenFilter(); + for (FileStatus status : FileListUtils.listFilesRecursively(fs, path, pathFilter)) { + SeekableInput sin = new FsInput(status.getPath(), fs); + DataFileReader<GenericRecord> dfr = new DataFileReader<>(sin, new GenericDatumReader<>()); + + String key = PathUtils.relativizePath(status.getPath(), path).toString(); + + output.put(key, new RecordIterator(dfr.getSchema(), new AbstractIterator<GenericRecord>() { + @Override + protected GenericRecord computeNext() { + if (dfr.hasNext()) { + return dfr.next(); + } else { + try { + dfr.close(); + } catch (IOException ioe) { + log.error("Failed to close data file reader.", ioe); + } + endOfData(); + return null; + } + } + })); + } + return output; + } + + /** + * Read all avro records in a json base resource in classpath into a map from file name to {@link RecordIterator}. + */ + @Override + public TreeMap<String, RecordIterator> readAllRecordsInJsonResource(String baseResource, @Nullable Schema schema) + throws IOException { + if (schema == null) { + String schemaResource = new File(baseResource, "schema.avsc").toString(); + schema = readAvscSchema(schemaResource, AvroTestTools.class); + } + + TreeMap<String, RecordIterator> output = new TreeMap<>(); + for (String file : getJsonFileSetByResourceRootName(baseResource)) { + log.info("Reading json record from " + file); + String name = PathUtils.relativizePath(new Path(file), new Path(baseResource)).toString(); + + String schemaResourceName = new File(new File(file).getParent(), "schema.avsc").toString(); + Schema thisSchema = readAvscSchema(schemaResourceName, AvroTestTools.class); + Schema actualSchema = thisSchema == null ? schema : thisSchema; + + InputStream is = AvroTestTools.class.getClassLoader().getResourceAsStream(file); Review comment: Use try with resources to close when done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
