This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b8fe5b9 [HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999) b8fe5b9 is described below commit b8fe5b91d599418cd908d833fd63edc7f362c548 Author: Jintao Guan <jintao.g...@uber.com> AuthorDate: Tue Jun 15 15:21:43 2021 -0700 [HUDI-764] [HUDI-765] ORC reader writer Implementation (#2999) Co-authored-by: Qingyun (Teresa) Kang <kter...@uber.com> --- LICENSE | 12 + NOTICE | 12 + .../apache/hudi/config/HoodieStorageConfig.java | 42 ++ .../org/apache/hudi/config/HoodieWriteConfig.java | 17 + .../apache/hudi/io/storage/HoodieFileWriter.java | 10 + .../hudi/io/storage/HoodieFileWriterFactory.java | 13 + .../apache/hudi/io/storage/HoodieHFileWriter.java | 10 +- .../apache/hudi/io/storage/HoodieOrcConfig.java | 72 ++ .../apache/hudi/io/storage/HoodieOrcWriter.java | 172 +++++ .../hudi/io/storage/HoodieParquetWriter.java | 9 +- .../java/org/apache/hudi/table/HoodieTable.java | 1 + .../hudi/io/storage/TestHoodieOrcReaderWriter.java | 261 +++++++ .../src/test/resources/exampleSchemaWithUDT.avsc | 67 ++ .../io/storage/TestHoodieFileWriterFactory.java | 7 + hudi-common/pom.xml | 8 + .../apache/hudi/common/model/HoodieFileFormat.java | 3 +- .../org/apache/hudi/common/util/AvroOrcUtils.java | 799 +++++++++++++++++++++ .../org/apache/hudi/common/util/BaseFileUtils.java | 133 +++- .../apache/hudi/common/util/OrcReaderIterator.java | 118 +++ .../java/org/apache/hudi/common/util/OrcUtils.java | 235 ++++++ .../org/apache/hudi/common/util/ParquetUtils.java | 60 +- .../hudi/io/storage/HoodieFileReaderFactory.java | 8 + .../apache/hudi/io/storage/HoodieOrcReader.java | 91 +++ .../apache/hudi/common/util/TestAvroOrcUtils.java | 76 ++ .../hudi/common/util/TestOrcReaderIterator.java | 92 +++ .../io/storage/TestHoodieFileReaderFactory.java | 7 +- .../hudi/hadoop/utils/HoodieInputFormatUtils.java | 9 + .../main/scala/org/apache/hudi/DefaultSource.scala | 13 +- pom.xml | 2 + 29 files changed, 2268 insertions(+), 91 deletions(-) diff --git a/LICENSE b/LICENSE index 385191d..28222a7 100644 --- a/LICENSE +++ b/LICENSE @@ -333,3 +333,15 @@ Copyright (c) 2005, European Commission project OneLab under contract 034819 (ht Home page: https://commons.apache.org/proper/commons-lang/ License: http://www.apache.org/licenses/LICENSE-2.0 + + ------------------------------------------------------------------------------- + + This product includes code from StreamSets Data Collector + + * com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter copied and modified to org.apache.hudi.common.util.AvroOrcUtils + * com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter copied and modified to org.apache.hudi.common.util.AvroOrcUtils + + Copyright 2018 StreamSets Inc. + + Home page: https://github.com/streamsets/datacollector-oss + License: http://www.apache.org/licenses/LICENSE-2.0 diff --git a/NOTICE b/NOTICE index 2f1aee6..9b24933 100644 --- a/NOTICE +++ b/NOTICE @@ -147,3 +147,15 @@ its NOTICE file: This product includes software developed at The Apache Software Foundation (http://www.apache.org/). + +-------------------------------------------------------------------------------- + +This product includes code from StreamSets Data Collector, which includes the following in +its NOTICE file: + + StreamSets datacollector-oss + Copyright 2018 StreamSets Inc. + + This product includes software developed at + StreamSets (http://www.streamsets.com/). + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java index 50b45f3..3cd8817 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieStorageConfig.java @@ -39,10 +39,21 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { public static final String DEFAULT_PARQUET_BLOCK_SIZE_BYTES = DEFAULT_PARQUET_FILE_MAX_BYTES; public static final String PARQUET_PAGE_SIZE_BYTES = "hoodie.parquet.page.size"; public static final String DEFAULT_PARQUET_PAGE_SIZE_BYTES = String.valueOf(1 * 1024 * 1024); + public static final String HFILE_FILE_MAX_BYTES = "hoodie.hfile.max.file.size"; public static final String HFILE_BLOCK_SIZE_BYTES = "hoodie.hfile.block.size"; public static final String DEFAULT_HFILE_BLOCK_SIZE_BYTES = String.valueOf(1 * 1024 * 1024); public static final String DEFAULT_HFILE_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024); + + public static final String ORC_FILE_MAX_BYTES = "hoodie.orc.max.file.size"; + public static final String DEFAULT_ORC_FILE_MAX_BYTES = String.valueOf(120 * 1024 * 1024); + // size of the memory buffer in bytes for writing + public static final String ORC_STRIPE_SIZE = "hoodie.orc.stripe.size"; + public static final String DEFAULT_ORC_STRIPE_SIZE = String.valueOf(64 * 1024 * 1024); + // file system block size + public static final String ORC_BLOCK_SIZE = "hoodie.orc.block.size"; + public static final String DEFAULT_ORC_BLOCK_SIZE = DEFAULT_ORC_FILE_MAX_BYTES; + // used to size log files public static final String LOGFILE_SIZE_MAX_BYTES = "hoodie.logfile.max.size"; public static final String DEFAULT_LOGFILE_SIZE_MAX_BYTES = String.valueOf(1024 * 1024 * 1024); // 1 GB @@ -54,9 +65,11 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { public static final String DEFAULT_STREAM_COMPRESSION_RATIO = String.valueOf(0.1); public static final String PARQUET_COMPRESSION_CODEC = "hoodie.parquet.compression.codec"; public static final String HFILE_COMPRESSION_ALGORITHM = "hoodie.hfile.compression.algorithm"; + public static final String ORC_COMPRESSION_CODEC = "hoodie.orc.compression.codec"; // Default compression codec for parquet public static final String DEFAULT_PARQUET_COMPRESSION_CODEC = "gzip"; public static final String DEFAULT_HFILE_COMPRESSION_ALGORITHM = "GZ"; + public static final String DEFAULT_ORC_COMPRESSION_CODEC = "ZLIB"; public static final String LOGFILE_TO_PARQUET_COMPRESSION_RATIO = "hoodie.logfile.to.parquet.compression.ratio"; // Default compression ratio for log file to parquet, general 3x public static final String DEFAULT_LOGFILE_TO_PARQUET_COMPRESSION_RATIO = String.valueOf(0.35); @@ -140,6 +153,26 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { return this; } + public Builder orcMaxFileSize(long maxFileSize) { + props.setProperty(ORC_FILE_MAX_BYTES, String.valueOf(maxFileSize)); + return this; + } + + public Builder orcStripeSize(int orcStripeSize) { + props.setProperty(ORC_STRIPE_SIZE, String.valueOf(orcStripeSize)); + return this; + } + + public Builder orcBlockSize(int orcBlockSize) { + props.setProperty(ORC_BLOCK_SIZE, String.valueOf(orcBlockSize)); + return this; + } + + public Builder orcCompressionCodec(String orcCompressionCodec) { + props.setProperty(ORC_COMPRESSION_CODEC, orcCompressionCodec); + return this; + } + public HoodieStorageConfig build() { HoodieStorageConfig config = new HoodieStorageConfig(props); setDefaultOnCondition(props, !props.containsKey(PARQUET_FILE_MAX_BYTES), PARQUET_FILE_MAX_BYTES, @@ -166,6 +199,15 @@ public class HoodieStorageConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(HFILE_FILE_MAX_BYTES), HFILE_FILE_MAX_BYTES, DEFAULT_HFILE_FILE_MAX_BYTES); + setDefaultOnCondition(props, !props.containsKey(ORC_FILE_MAX_BYTES), ORC_FILE_MAX_BYTES, + DEFAULT_ORC_FILE_MAX_BYTES); + setDefaultOnCondition(props, !props.containsKey(ORC_STRIPE_SIZE), ORC_STRIPE_SIZE, + DEFAULT_ORC_STRIPE_SIZE); + setDefaultOnCondition(props, !props.containsKey(ORC_BLOCK_SIZE), ORC_BLOCK_SIZE, + DEFAULT_ORC_BLOCK_SIZE); + setDefaultOnCondition(props, !props.containsKey(ORC_COMPRESSION_CODEC), ORC_COMPRESSION_CODEC, + DEFAULT_ORC_COMPRESSION_CODEC); + return config; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index cf5ac5c..9e89e0e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -42,6 +42,7 @@ import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; +import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import javax.annotation.concurrent.Immutable; @@ -784,6 +785,22 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Compression.Algorithm.valueOf(props.getProperty(HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM)); } + public long getOrcMaxFileSize() { + return Long.parseLong(props.getProperty(HoodieStorageConfig.ORC_FILE_MAX_BYTES)); + } + + public int getOrcStripeSize() { + return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_STRIPE_SIZE)); + } + + public int getOrcBlockSize() { + return Integer.parseInt(props.getProperty(HoodieStorageConfig.ORC_BLOCK_SIZE)); + } + + public CompressionKind getOrcCompressionCodec() { + return CompressionKind.valueOf(props.getProperty(HoodieStorageConfig.ORC_COMPRESSION_CODEC)); + } + /** * metrics properties. */ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index 1aaa389..a579234 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -18,6 +18,9 @@ package org.apache.hudi.io.storage; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.avro.generic.IndexedRecord; @@ -35,4 +38,11 @@ public interface HoodieFileWriter<R extends IndexedRecord> { void writeAvro(String key, R oldRecord) throws IOException; long getBytesWritten(); + + default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { + String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); + HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); + HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); + return; + } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 23701b0..96f19ca 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -34,6 +34,7 @@ import org.apache.parquet.avro.AvroSchemaConverter; import java.io.IOException; +import static org.apache.hudi.common.model.HoodieFileFormat.ORC; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; @@ -49,6 +50,9 @@ public class HoodieFileWriterFactory { if (HFILE.getFileExtension().equals(extension)) { return newHFileFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier); } + if (ORC.getFileExtension().equals(extension)) { + return newOrcFileWriter(instantTime, path, config, schema, hoodieTable, taskContextSupplier); + } throw new UnsupportedOperationException(extension + " format not supported yet."); } @@ -77,6 +81,15 @@ public class HoodieFileWriterFactory { return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier); } + private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFileWriter<R> newOrcFileWriter( + String instantTime, Path path, HoodieWriteConfig config, Schema schema, HoodieTable hoodieTable, + TaskContextSupplier taskContextSupplier) throws IOException { + BloomFilter filter = createBloomFilter(config); + HoodieOrcConfig orcConfig = new HoodieOrcConfig(hoodieTable.getHadoopConf(), config.getOrcCompressionCodec(), + config.getOrcStripeSize(), config.getOrcBlockSize(), config.getOrcMaxFileSize(), filter); + return new HoodieOrcWriter<>(instantTime, path, orcConfig, schema, taskContextSupplier); + } + private static BloomFilter createBloomFilter(HoodieWriteConfig config) { return BloomFilterFactory.createBloomFilter(config.getBloomFilterNumEntries(), config.getBloomFilterFPP(), config.getDynamicBloomFilterMaxNumEntries(), diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index 352c51c..6747c4a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -99,13 +99,9 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR @Override public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { - String seqId = - HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement()); - HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), - file.getName()); - HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); - - writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord); + prepRecordWithMetadata(avroRecord, record, instantTime, + taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); + writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord); } @Override diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java new file mode 100644 index 0000000..c45e024 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcConfig.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.orc.CompressionKind; + +public class HoodieOrcConfig { + static final String AVRO_SCHEMA_METADATA_KEY = "orc.avro.schema"; + + private final CompressionKind compressionKind; + private final int stripeSize; + private final int blockSize; + private final long maxFileSize; + private final Configuration hadoopConf; + private final BloomFilter bloomFilter; + + public HoodieOrcConfig(Configuration hadoopConf, CompressionKind compressionKind, int stripeSize, + int blockSize, long maxFileSize, BloomFilter bloomFilter) { + this.hadoopConf = hadoopConf; + this.compressionKind = compressionKind; + this.stripeSize = stripeSize; + this.blockSize = blockSize; + this.maxFileSize = maxFileSize; + this.bloomFilter = bloomFilter; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public CompressionKind getCompressionKind() { + return compressionKind; + } + + public int getStripeSize() { + return stripeSize; + } + + public int getBlockSize() { + return blockSize; + } + + public long getMaxFileSize() { + return maxFileSize; + } + + public boolean useBloomFilter() { + return bloomFilter != null; + } + + public BloomFilter getBloomFilter() { + return bloomFilter; + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java new file mode 100644 index 0000000..f076842 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.AvroOrcUtils; + +public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord> + implements HoodieFileWriter<R> { + private static final AtomicLong RECORD_INDEX = new AtomicLong(1); + + private final long maxFileSize; + private final Schema avroSchema; + private final List<TypeDescription> fieldTypes; + private final List<String> fieldNames; + private final VectorizedRowBatch batch; + private final Writer writer; + + private final Path file; + private final HoodieWrapperFileSystem fs; + private final String instantTime; + private final TaskContextSupplier taskContextSupplier; + + private HoodieOrcConfig orcConfig; + private String minRecordKey; + private String maxRecordKey; + + public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Schema schema, + TaskContextSupplier taskContextSupplier) throws IOException { + + Configuration conf = FSUtils.registerFileSystem(file, config.getHadoopConf()); + this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf); + this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf); + this.instantTime = instantTime; + this.taskContextSupplier = taskContextSupplier; + + this.avroSchema = schema; + final TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema); + this.fieldTypes = orcSchema.getChildren(); + this.fieldNames = orcSchema.getFieldNames(); + this.maxFileSize = config.getMaxFileSize(); + this.batch = orcSchema.createRowBatch(); + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf) + .blockSize(config.getBlockSize()) + .stripeSize(config.getStripeSize()) + .compress(config.getCompressionKind()) + .bufferSize(config.getBlockSize()) + .fileSystem(fs) + .setSchema(orcSchema); + this.writer = OrcFile.createWriter(this.file, writerOptions); + this.orcConfig = config; + } + + @Override + public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { + prepRecordWithMetadata(avroRecord, record, instantTime, + taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName()); + writeAvro(record.getRecordKey(), avroRecord); + } + + @Override + public boolean canWrite() { + return fs.getBytesWritten(file) < maxFileSize; + } + + @Override + public void writeAvro(String recordKey, IndexedRecord object) throws IOException { + for (int col = 0; col < batch.numCols; col++) { + ColumnVector colVector = batch.cols[col]; + final String thisField = fieldNames.get(col); + final TypeDescription type = fieldTypes.get(col); + + Object fieldValue = ((GenericRecord) object).get(thisField); + Schema.Field avroField = avroSchema.getField(thisField); + AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size); + } + + batch.size++; + + // Batch size corresponds to the number of written rows out of 1024 total rows (by default) + // in the row batch, add the batch to file once all rows are filled and reset. + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + batch.size = 0; + } + + if (orcConfig.useBloomFilter()) { + orcConfig.getBloomFilter().add(recordKey); + if (minRecordKey != null) { + minRecordKey = minRecordKey.compareTo(recordKey) <= 0 ? minRecordKey : recordKey; + } else { + minRecordKey = recordKey; + } + + if (maxRecordKey != null) { + maxRecordKey = maxRecordKey.compareTo(recordKey) >= 0 ? maxRecordKey : recordKey; + } else { + maxRecordKey = recordKey; + } + } + } + + @Override + public void close() throws IOException { + if (batch.size != 0) { + writer.addRowBatch(batch); + batch.reset(); + } + + if (orcConfig.useBloomFilter()) { + final BloomFilter bloomFilter = orcConfig.getBloomFilter(); + writer.addUserMetadata(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, ByteBuffer.wrap(bloomFilter.serializeToString().getBytes())); + if (minRecordKey != null && maxRecordKey != null) { + writer.addUserMetadata(HOODIE_MIN_RECORD_KEY_FOOTER, ByteBuffer.wrap(minRecordKey.getBytes())); + writer.addUserMetadata(HOODIE_MAX_RECORD_KEY_FOOTER, ByteBuffer.wrap(maxRecordKey.getBytes())); + } + if (bloomFilter.getBloomFilterTypeCode().name().contains(HoodieDynamicBoundedBloomFilter.TYPE_CODE_PREFIX)) { + writer.addUserMetadata(HOODIE_BLOOM_FILTER_TYPE_CODE, ByteBuffer.wrap(bloomFilter.getBloomFilterTypeCode().name().getBytes())); + } + } + writer.addUserMetadata(HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY, ByteBuffer.wrap(avroSchema.toString().getBytes())); + + writer.close(); + } + + @Override + public long getBytesWritten() { + return fs.getBytesWritten(file); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java index c3939d7..b6e77bc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java @@ -18,7 +18,6 @@ package org.apache.hudi.io.storage; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -27,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.ParquetFileWriter; @@ -75,11 +73,8 @@ public class HoodieParquetWriter<T extends HoodieRecordPayload, R extends Indexe @Override public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException { - String seqId = - HoodieRecord.generateSequenceId(instantTime, taskContextSupplier.getPartitionIdSupplier().get(), recordIndex.getAndIncrement()); - HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), - file.getName()); - HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId); + prepRecordWithMetadata(avroRecord, record, instantTime, + taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName()); super.write(avroRecord); writeSupport.add(record.getRecordKey()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 512518c..0ff2093 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -656,6 +656,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem public HoodieLogBlockType getLogDataBlockFormat() { switch (getBaseFileFormat()) { case PARQUET: + case ORC: return HoodieLogBlockType.AVRO_DATA_BLOCK; case HFILE: return HoodieLogBlockType.HFILE_DATA_BLOCK; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java new file mode 100644 index 0000000..d69bc70 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieOrcReaderWriter.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER; +import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.apache.hudi.io.storage.HoodieOrcConfig.AVRO_SCHEMA_METADATA_KEY; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieOrcReaderWriter { + private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.orc"); + + @BeforeEach + @AfterEach + public void clearTempFile() { + File file = new File(filePath.toString()); + if (file.exists()) { + file.delete(); + } + } + + private HoodieOrcWriter createOrcWriter(Schema avroSchema) throws Exception { + BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, -1, BloomFilterTypeCode.SIMPLE.name()); + Configuration conf = new Configuration(); + int orcStripSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_STRIPE_SIZE); + int orcBlockSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_BLOCK_SIZE); + int maxFileSize = Integer.parseInt(HoodieStorageConfig.DEFAULT_ORC_FILE_MAX_BYTES); + HoodieOrcConfig config = new HoodieOrcConfig(conf, CompressionKind.ZLIB, orcStripSize, orcBlockSize, maxFileSize, filter); + TaskContextSupplier mockTaskContextSupplier = Mockito.mock(TaskContextSupplier.class); + String instantTime = "000"; + return new HoodieOrcWriter(instantTime, filePath, config, avroSchema, mockTaskContextSupplier); + } + + @Test + public void testWriteReadMetadata() throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + HoodieOrcWriter writer = createOrcWriter(avroSchema); + for (int i = 0; i < 3; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("_row_key", "key" + i); + record.put("time", Integer.toString(i)); + record.put("number", i); + writer.writeAvro("key" + i, record); + } + writer.close(); + + Configuration conf = new Configuration(); + Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + assertEquals(4, orcReader.getMetadataKeys().size()); + assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MIN_RECORD_KEY_FOOTER)); + assertTrue(orcReader.getMetadataKeys().contains(HOODIE_MAX_RECORD_KEY_FOOTER)); + assertTrue(orcReader.getMetadataKeys().contains(HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY)); + assertTrue(orcReader.getMetadataKeys().contains(AVRO_SCHEMA_METADATA_KEY)); + assertEquals(CompressionKind.ZLIB.name(), orcReader.getCompressionKind().toString()); + + HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + BloomFilter filter = hoodieReader.readBloomFilter(); + for (int i = 0; i < 3; i++) { + assertTrue(filter.mightContain("key" + i)); + } + assertFalse(filter.mightContain("non-existent-key")); + assertEquals(3, hoodieReader.getTotalRecords()); + String[] minMaxRecordKeys = hoodieReader.readMinMaxRecordKeys(); + assertEquals(2, minMaxRecordKeys.length); + assertEquals("key0", minMaxRecordKeys[0]); + assertEquals("key2", minMaxRecordKeys[1]); + } + + @Test + public void testWriteReadPrimitiveRecord() throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + HoodieOrcWriter writer = createOrcWriter(avroSchema); + for (int i = 0; i < 3; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("_row_key", "key" + i); + record.put("time", Integer.toString(i)); + record.put("number", i); + writer.writeAvro("key" + i, record); + } + writer.close(); + + Configuration conf = new Configuration(); + Reader orcReader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + assertEquals("struct<_row_key:string,time:string,number:int>", orcReader.getSchema().toString()); + assertEquals(3, orcReader.getNumberOfRows()); + + HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + Iterator<GenericRecord> iter = hoodieReader.getRecordIterator(); + int index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertEquals(index, record.get("number")); + index++; + } + } + + @Test + public void testWriteReadComplexRecord() throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchemaWithUDT.avsc"); + Schema udtSchema = avroSchema.getField("driver").schema().getTypes().get(1); + HoodieOrcWriter writer = createOrcWriter(avroSchema); + for (int i = 0; i < 3; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("_row_key", "key" + i); + record.put("time", Integer.toString(i)); + record.put("number", i); + GenericRecord innerRecord = new GenericData.Record(udtSchema); + innerRecord.put("driver_name", "driver" + i); + innerRecord.put("list", Collections.singletonList(i)); + innerRecord.put("map", Collections.singletonMap("key" + i, "value" + i)); + record.put("driver", innerRecord); + writer.writeAvro("key" + i, record); + } + writer.close(); + + Configuration conf = new Configuration(); + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + assertEquals("struct<_row_key:string,time:string,number:int,driver:struct<driver_name:string,list:array<int>,map:map<string,string>>>", + reader.getSchema().toString()); + assertEquals(3, reader.getNumberOfRows()); + + HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + Iterator<GenericRecord> iter = hoodieReader.getRecordIterator(); + int index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertEquals(index, record.get("number")); + GenericRecord innerRecord = (GenericRecord) record.get("driver"); + assertEquals("driver" + index, innerRecord.get("driver_name").toString()); + assertEquals(1, ((List<?>)innerRecord.get("list")).size()); + assertEquals(index, ((List<?>)innerRecord.get("list")).get(0)); + assertEquals("value" + index, ((Map<?,?>)innerRecord.get("map")).get("key" + index).toString()); + index++; + } + } + + @Test + public void testWriteReadWithEvolvedSchema() throws Exception { + Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleSchema.avsc"); + HoodieOrcWriter writer = createOrcWriter(avroSchema); + for (int i = 0; i < 3; i++) { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("_row_key", "key" + i); + record.put("time", Integer.toString(i)); + record.put("number", i); + writer.writeAvro("key" + i, record); + } + writer.close(); + + Configuration conf = new Configuration(); + HoodieFileReader<GenericRecord> hoodieReader = HoodieFileReaderFactory.getFileReader(conf, filePath); + Schema evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchema.avsc"); + Iterator<GenericRecord> iter = hoodieReader.getRecordIterator(evolvedSchema); + int index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertEquals(index, record.get("number")); + assertNull(record.get("added_field")); + index++; + } + + evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaChangeOrder.avsc"); + iter = hoodieReader.getRecordIterator(evolvedSchema); + index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertEquals(index, record.get("number")); + assertNull(record.get("added_field")); + index++; + } + + evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaColumnRequire.avsc"); + iter = hoodieReader.getRecordIterator(evolvedSchema); + index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertEquals(index, record.get("number")); + assertNull(record.get("added_field")); + index++; + } + + evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaColumnType.avsc"); + iter = hoodieReader.getRecordIterator(evolvedSchema); + index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertEquals(Integer.toString(index), record.get("number").toString()); + assertNull(record.get("added_field")); + index++; + } + + evolvedSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, "/exampleEvolvedSchemaDeleteColumn.avsc"); + iter = hoodieReader.getRecordIterator(evolvedSchema); + index = 0; + while (iter.hasNext()) { + GenericRecord record = iter.next(); + assertEquals("key" + index, record.get("_row_key").toString()); + assertEquals(Integer.toString(index), record.get("time").toString()); + assertNull(record.get("number")); + assertNull(record.get("added_field")); + index++; + } + } +} diff --git a/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc new file mode 100644 index 0000000..4c40fb2 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/resources/exampleSchemaWithUDT.avsc @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "example.schema", + "type": "record", + "name": "trip", + "fields": [ + { + "name": "_row_key", + "type": "string" + }, + { + "name": "time", + "type": "string" + }, + { + "name": "number", + "type": ["null", "int"] + }, + { + "name": "driver", + "type": [ + "null", + { + "name": "person", + "type": "record", + "fields": [ + { + "default": null, + "name": "driver_name", + "type": ["null", "string"] + }, + { + "name": "list", + "type": { + "type": "array", + "items": "int" + } + }, + { + "name": "map", + "type": { + "type": "map", + "values": "string" + } + } + ] + } + ] + } + ] +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java index 26f431a..b7f34ab 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieFileWriterFactory.java @@ -51,11 +51,18 @@ public class TestHoodieFileWriterFactory extends HoodieClientTestBase { parquetPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); assertTrue(parquetWriter instanceof HoodieParquetWriter); + // hfile format. final Path hfilePath = new Path(basePath + "/partition/path/f1_1-0-1_000.hfile"); HoodieFileWriter<IndexedRecord> hfileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, hfilePath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); assertTrue(hfileWriter instanceof HoodieHFileWriter); + // orc file format. + final Path orcPath = new Path(basePath + "/partition/path/f1_1-0-1_000.orc"); + HoodieFileWriter<IndexedRecord> orcFileWriter = HoodieFileWriterFactory.getFileWriter(instantTime, + orcPath, table, cfg, HoodieTestDataGenerator.AVRO_SCHEMA, supplier); + assertTrue(orcFileWriter instanceof HoodieOrcWriter); + // other file format exception. final Path logPath = new Path(basePath + "/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index 6ec0d95..a41e73e 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -119,6 +119,14 @@ <artifactId>parquet-avro</artifactId> </dependency> + <!-- ORC --> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>${orc.version}</version> + <classifier>nohive</classifier> + </dependency> + <!-- Httpcomponents --> <dependency> <groupId>org.apache.httpcomponents</groupId> diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java index 552c38f..f7fdcd0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java @@ -24,7 +24,8 @@ package org.apache.hudi.common.model; public enum HoodieFileFormat { PARQUET(".parquet"), HOODIE_LOG(".log"), - HFILE(".hfile"); + HFILE(".hfile"), + ORC(".orc"); private final String extension; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java new file mode 100644 index 0000000..0f1f49f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -0,0 +1,799 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Timestamp; +import java.util.Base64; +import java.util.Date; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData; +import java.nio.charset.StandardCharsets; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.StringType; +import org.apache.avro.util.Utf8; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; +import org.apache.orc.storage.serde2.io.DateWritable; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.orc.TypeDescription; + +/** + * Methods including addToVector, addUnionValue, createOrcSchema are originally from + * https://github.com/streamsets/datacollector. + * Source classes: + * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcRecordConverter + * - com.streamsets.pipeline.lib.util.avroorc.AvroToOrcSchemaConverter + * + * Changes made: + * 1. Flatten nullable Avro schema type when the value is not null in `addToVector`. + * 2. Use getLogicalType(), constants from LogicalTypes instead of getJsonProp() to handle Avro logical types. + */ +public class AvroOrcUtils { + + private static final int MICROS_PER_MILLI = 1000; + private static final int NANOS_PER_MICRO = 1000; + + /** + * Add an object (of a given ORC type) to the column vector at a given position. + * + * @param type ORC schema of the value Object. + * @param colVector The column vector to store the value Object. + * @param avroSchema Avro schema of the value Object. + * Only used to check logical types for timestamp unit conversion. + * @param value Object to be added to the column vector + * @param vectorPos The position in the vector where value will be stored at. + */ + public static void addToVector(TypeDescription type, ColumnVector colVector, Schema avroSchema, Object value, int vectorPos) { + + final int currentVecLength = colVector.isNull.length; + if (vectorPos >= currentVecLength) { + colVector.ensureSize(2 * currentVecLength, true); + } + if (value == null) { + colVector.isNull[vectorPos] = true; + colVector.noNulls = false; + return; + } + + if (avroSchema.getType().equals(Schema.Type.UNION)) { + avroSchema = getActualSchemaType(avroSchema); + } + + LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() : null; + + switch (type.getCategory()) { + case BOOLEAN: + LongColumnVector boolVec = (LongColumnVector) colVector; + boolVec.vector[vectorPos] = (boolean) value ? 1 : 0; + break; + case BYTE: + LongColumnVector byteColVec = (LongColumnVector) colVector; + byteColVec.vector[vectorPos] = (byte) value; + break; + case SHORT: + LongColumnVector shortColVec = (LongColumnVector) colVector; + shortColVec.vector[vectorPos] = (short) value; + break; + case INT: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but we will ignore that fact here + // since Orc has no way to represent a time in the way Avro defines it; we will simply preserve the int value + LongColumnVector intColVec = (LongColumnVector) colVector; + intColVec.vector[vectorPos] = (int) value; + break; + case LONG: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but we will ignore that fact here + // since Orc has no way to represent a time in the way Avro defines it; we will simply preserve the long value + LongColumnVector longColVec = (LongColumnVector) colVector; + longColVec.vector[vectorPos] = (long) value; + break; + case FLOAT: + DoubleColumnVector floatColVec = (DoubleColumnVector) colVector; + floatColVec.vector[vectorPos] = (float) value; + break; + case DOUBLE: + DoubleColumnVector doubleColVec = (DoubleColumnVector) colVector; + doubleColVec.vector[vectorPos] = (double) value; + break; + case VARCHAR: + case CHAR: + case STRING: + BytesColumnVector bytesColVec = (BytesColumnVector) colVector; + byte[] bytes = null; + + if (value instanceof String) { + bytes = ((String) value).getBytes(StandardCharsets.UTF_8); + } else if (value instanceof Utf8) { + final Utf8 utf8 = (Utf8) value; + bytes = utf8.getBytes(); + } else if (value instanceof GenericData.EnumSymbol) { + bytes = ((GenericData.EnumSymbol) value).toString().getBytes(StandardCharsets.UTF_8); + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro %s field value, which has type %s, value %s", + type.getCategory().getName(), + value.getClass().getName(), + value.toString() + )); + } + + if (bytes == null) { + bytesColVec.isNull[vectorPos] = true; + bytesColVec.noNulls = false; + } else { + bytesColVec.setRef(vectorPos, bytes, 0, bytes.length); + } + break; + case DATE: + LongColumnVector dateColVec = (LongColumnVector) colVector; + int daysSinceEpoch; + if (logicalType instanceof LogicalTypes.Date) { + daysSinceEpoch = (int) value; + } else if (value instanceof java.sql.Date) { + daysSinceEpoch = DateWritable.dateToDays((java.sql.Date) value); + } else if (value instanceof Date) { + daysSinceEpoch = DateWritable.millisToDays(((Date) value).getTime()); + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro DATE field value, which has type %s, value %s", + value.getClass().getName(), + value.toString() + )); + } + dateColVec.vector[vectorPos] = daysSinceEpoch; + break; + case TIMESTAMP: + TimestampColumnVector tsColVec = (TimestampColumnVector) colVector; + + long time; + int nanos = 0; + + // The unit for Timestamp in ORC is millis, convert timestamp to millis if needed + if (logicalType instanceof LogicalTypes.TimestampMillis) { + time = (long) value; + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + final long logicalTsValue = (long) value; + time = logicalTsValue / MICROS_PER_MILLI; + nanos = NANOS_PER_MICRO * ((int) (logicalTsValue % MICROS_PER_MILLI)); + } else if (value instanceof Timestamp) { + Timestamp tsValue = (Timestamp) value; + time = tsValue.getTime(); + nanos = tsValue.getNanos(); + } else if (value instanceof java.sql.Date) { + java.sql.Date sqlDateValue = (java.sql.Date) value; + time = sqlDateValue.getTime(); + } else if (value instanceof Date) { + Date dateValue = (Date) value; + time = dateValue.getTime(); + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro TIMESTAMP field value, which has type %s, value %s", + value.getClass().getName(), + value.toString() + )); + } + + tsColVec.time[vectorPos] = time; + tsColVec.nanos[vectorPos] = nanos; + break; + case BINARY: + BytesColumnVector binaryColVec = (BytesColumnVector) colVector; + + byte[] binaryBytes; + if (value instanceof GenericData.Fixed) { + binaryBytes = ((GenericData.Fixed)value).bytes(); + } else if (value instanceof ByteBuffer) { + final ByteBuffer byteBuffer = (ByteBuffer) value; + binaryBytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(binaryBytes); + } else if (value instanceof byte[]) { + binaryBytes = (byte[]) value; + } else { + throw new IllegalStateException(String.format( + "Unrecognized type for Avro BINARY field value, which has type %s, value %s", + value.getClass().getName(), + value.toString() + )); + } + binaryColVec.setRef(vectorPos, binaryBytes, 0, binaryBytes.length); + break; + case DECIMAL: + DecimalColumnVector decimalColVec = (DecimalColumnVector) colVector; + HiveDecimal decimalValue; + if (value instanceof BigDecimal) { + final BigDecimal decimal = (BigDecimal) value; + decimalValue = HiveDecimal.create(decimal); + } else if (value instanceof ByteBuffer) { + final ByteBuffer byteBuffer = (ByteBuffer) value; + final byte[] decimalBytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(decimalBytes); + final BigInteger bigInt = new BigInteger(decimalBytes); + final int scale = type.getScale(); + BigDecimal bigDecVal = new BigDecimal(bigInt, scale); + + decimalValue = HiveDecimal.create(bigDecVal); + if (decimalValue == null && decimalBytes.length > 0) { + throw new IllegalStateException( + "Unexpected read null HiveDecimal from bytes (base-64 encoded): " + + Base64.getEncoder().encodeToString(decimalBytes) + ); + } + } else if (value instanceof GenericData.Fixed) { + final BigDecimal decimal = new Conversions.DecimalConversion() + .fromFixed((GenericData.Fixed) value, avroSchema, logicalType); + decimalValue = HiveDecimal.create(decimal); + } else { + throw new IllegalStateException(String.format( + "Unexpected type for decimal (%s), cannot convert from Avro value", + value.getClass().getCanonicalName() + )); + } + if (decimalValue == null) { + decimalColVec.isNull[vectorPos] = true; + decimalColVec.noNulls = false; + } else { + decimalColVec.set(vectorPos, decimalValue); + } + break; + case LIST: + List<?> list = (List<?>) value; + ListColumnVector listColVec = (ListColumnVector) colVector; + listColVec.offsets[vectorPos] = listColVec.childCount; + listColVec.lengths[vectorPos] = list.size(); + + TypeDescription listType = type.getChildren().get(0); + for (Object listItem : list) { + addToVector(listType, listColVec.child, avroSchema.getElementType(), listItem, listColVec.childCount++); + } + break; + case MAP: + Map<String, ?> mapValue = (Map<String, ?>) value; + + MapColumnVector mapColumnVector = (MapColumnVector) colVector; + mapColumnVector.offsets[vectorPos] = mapColumnVector.childCount; + mapColumnVector.lengths[vectorPos] = mapValue.size(); + + // keys are always strings + Schema keySchema = Schema.create(Schema.Type.STRING); + for (Map.Entry<String, ?> entry : mapValue.entrySet()) { + addToVector( + type.getChildren().get(0), + mapColumnVector.keys, + keySchema, + entry.getKey(), + mapColumnVector.childCount + ); + + addToVector( + type.getChildren().get(1), + mapColumnVector.values, + avroSchema.getValueType(), + entry.getValue(), + mapColumnVector.childCount + ); + + mapColumnVector.childCount++; + } + + break; + case STRUCT: + StructColumnVector structColVec = (StructColumnVector) colVector; + + GenericData.Record record = (GenericData.Record) value; + + for (int i = 0; i < type.getFieldNames().size(); i++) { + String fieldName = type.getFieldNames().get(i); + Object fieldValue = record.get(fieldName); + TypeDescription fieldType = type.getChildren().get(i); + addToVector(fieldType, structColVec.fields[i], avroSchema.getFields().get(i).schema(), fieldValue, vectorPos); + } + + break; + case UNION: + UnionColumnVector unionColVec = (UnionColumnVector) colVector; + + List<TypeDescription> childTypes = type.getChildren(); + boolean added = addUnionValue(unionColVec, childTypes, avroSchema, value, vectorPos); + + if (!added) { + throw new IllegalStateException(String.format( + "Failed to add value %s to union with type %s", + value == null ? "null" : value.toString(), + type.toString() + )); + } + + break; + default: + throw new IllegalArgumentException("Invalid TypeDescription " + type.toString() + "."); + } + } + + /** + * Match value with its ORC type and add to the union vector at a given position. + * + * @param unionVector The vector to store value. + * @param unionChildTypes All possible types for the value Object. + * @param avroSchema Avro union schema for the value Object. + * @param value Object to be added to the unionVector + * @param vectorPos The position in the vector where value will be stored at. + * @return succeeded or failed + */ + public static boolean addUnionValue( + UnionColumnVector unionVector, + List<TypeDescription> unionChildTypes, + Schema avroSchema, + Object value, + int vectorPos + ) { + int matchIndex = -1; + TypeDescription matchType = null; + Object matchValue = null; + + for (int t = 0; t < unionChildTypes.size(); t++) { + TypeDescription childType = unionChildTypes.get(t); + boolean matches = false; + + switch (childType.getCategory()) { + case BOOLEAN: + matches = value instanceof Boolean; + break; + case BYTE: + matches = value instanceof Byte; + break; + case SHORT: + matches = value instanceof Short; + break; + case INT: + matches = value instanceof Integer; + break; + case LONG: + matches = value instanceof Long; + break; + case FLOAT: + matches = value instanceof Float; + break; + case DOUBLE: + matches = value instanceof Double; + break; + case STRING: + case VARCHAR: + case CHAR: + if (value instanceof String) { + matches = true; + matchValue = ((String) value).getBytes(StandardCharsets.UTF_8); + } else if (value instanceof Utf8) { + matches = true; + matchValue = ((Utf8) value).getBytes(); + } + break; + case DATE: + matches = value instanceof Date; + break; + case TIMESTAMP: + matches = value instanceof Timestamp; + break; + case BINARY: + matches = value instanceof byte[] || value instanceof GenericData.Fixed; + break; + case DECIMAL: + matches = value instanceof BigDecimal; + break; + case LIST: + matches = value instanceof List; + break; + case MAP: + matches = value instanceof Map; + break; + case STRUCT: + throw new UnsupportedOperationException("Cannot handle STRUCT within UNION."); + case UNION: + List<TypeDescription> children = childType.getChildren(); + if (value == null) { + matches = children == null || children.size() == 0; + } else { + matches = addUnionValue(unionVector, children, avroSchema, value, vectorPos); + } + break; + default: + throw new IllegalArgumentException("Invalid TypeDescription " + childType.getCategory().toString() + "."); + } + + if (matches) { + matchIndex = t; + matchType = childType; + break; + } + } + + if (value == null && matchValue != null) { + value = matchValue; + } + + if (matchIndex >= 0) { + unionVector.tags[vectorPos] = matchIndex; + if (value == null) { + unionVector.isNull[vectorPos] = true; + unionVector.noNulls = false; + } else { + addToVector(matchType, unionVector.fields[matchIndex], avroSchema.getTypes().get(matchIndex), value, vectorPos); + } + return true; + } else { + return false; + } + } + + /** + * Read the Column vector at a given position conforming to a given ORC schema. + * + * @param type ORC schema of the object to read. + * @param colVector The column vector to read. + * @param avroSchema Avro schema of the object to read. + * Only used to check logical types for timestamp unit conversion. + * @param vectorPos The position in the vector where the value to read is stored at. + * @return The object being read. + */ + public static Object readFromVector(TypeDescription type, ColumnVector colVector, Schema avroSchema, int vectorPos) { + + if (colVector.isRepeating) { + vectorPos = 0; + } + + if (colVector.isNull[vectorPos]) { + return null; + } + + if (avroSchema.getType().equals(Schema.Type.UNION)) { + avroSchema = getActualSchemaType(avroSchema); + } + LogicalType logicalType = avroSchema != null ? avroSchema.getLogicalType() : null; + + switch (type.getCategory()) { + case BOOLEAN: + return ((LongColumnVector) colVector).vector[vectorPos] != 0; + case BYTE: + return (byte) ((LongColumnVector) colVector).vector[vectorPos]; + case SHORT: + return (short) ((LongColumnVector) colVector).vector[vectorPos]; + case INT: + return (int) ((LongColumnVector) colVector).vector[vectorPos]; + case LONG: + return ((LongColumnVector) colVector).vector[vectorPos]; + case FLOAT: + return (float) ((DoubleColumnVector) colVector).vector[vectorPos]; + case DOUBLE: + return ((DoubleColumnVector) colVector).vector[vectorPos]; + case VARCHAR: + case CHAR: + int maxLength = type.getMaxLength(); + String result = ((BytesColumnVector) colVector).toString(vectorPos); + if (result.length() <= maxLength) { + return result; + } else { + throw new HoodieIOException("CHAR/VARCHAR has length " + result.length() + " greater than Max Length allowed"); + } + case STRING: + String stringType = avroSchema.getProp(GenericData.STRING_PROP); + if (stringType == null || !stringType.equals(StringType.String)) { + int stringLength = ((BytesColumnVector) colVector).length[vectorPos]; + int stringOffset = ((BytesColumnVector) colVector).start[vectorPos]; + byte[] stringBytes = new byte[stringLength]; + System.arraycopy(((BytesColumnVector) colVector).vector[vectorPos], stringOffset, stringBytes, 0, stringLength); + return new Utf8(stringBytes); + } else { + return ((BytesColumnVector) colVector).toString(vectorPos); + } + case DATE: + // convert to daysSinceEpoch for LogicalType.Date + return (int) ((LongColumnVector) colVector).vector[vectorPos]; + case TIMESTAMP: + // The unit of time in ORC is millis. Convert (time,nanos) to the desired unit per logicalType + long time = ((TimestampColumnVector) colVector).time[vectorPos]; + int nanos = ((TimestampColumnVector) colVector).nanos[vectorPos]; + if (logicalType instanceof LogicalTypes.TimestampMillis) { + return time; + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return time * MICROS_PER_MILLI + nanos / NANOS_PER_MICRO; + } else { + return ((TimestampColumnVector) colVector).getTimestampAsLong(vectorPos); + } + case BINARY: + int binaryLength = ((BytesColumnVector) colVector).length[vectorPos]; + int binaryOffset = ((BytesColumnVector) colVector).start[vectorPos]; + byte[] binaryBytes = new byte[binaryLength]; + System.arraycopy(((BytesColumnVector) colVector).vector[vectorPos], binaryOffset, binaryBytes, 0, binaryLength); + // return a ByteBuffer to be consistent with AvroRecordConverter + return ByteBuffer.wrap(binaryBytes); + case DECIMAL: + // HiveDecimal always ignores trailing zeros, thus modifies the scale implicitly, + // therefore, the scale must be enforced here. + BigDecimal bigDecimal = ((DecimalColumnVector) colVector).vector[vectorPos] + .getHiveDecimal().bigDecimalValue() + .setScale(((LogicalTypes.Decimal) logicalType).getScale()); + Schema.Type baseType = avroSchema.getType(); + if (baseType.equals(Schema.Type.FIXED)) { + return new Conversions.DecimalConversion().toFixed(bigDecimal, avroSchema, logicalType); + } else if (baseType.equals(Schema.Type.BYTES)) { + return bigDecimal.unscaledValue().toByteArray(); + } else { + throw new HoodieIOException(baseType.getName() + "is not a valid type for LogicalTypes.DECIMAL."); + } + case LIST: + ArrayList<Object> list = new ArrayList<>(); + ListColumnVector listVector = (ListColumnVector) colVector; + int listLength = (int) listVector.lengths[vectorPos]; + int listOffset = (int) listVector.offsets[vectorPos]; + list.ensureCapacity(listLength); + TypeDescription childType = type.getChildren().get(0); + for (int i = 0; i < listLength; i++) { + list.add(readFromVector(childType, listVector.child, avroSchema.getElementType(), listOffset + i)); + } + return list; + case MAP: + Map<String, Object> map = new HashMap<String, Object>(); + MapColumnVector mapVector = (MapColumnVector) colVector; + int mapLength = (int) mapVector.lengths[vectorPos]; + int mapOffset = (int) mapVector.offsets[vectorPos]; + // keys are always strings for maps in Avro + Schema keySchema = Schema.create(Schema.Type.STRING); + for (int i = 0; i < mapLength; i++) { + map.put( + readFromVector(type.getChildren().get(0), mapVector.keys, keySchema, i + mapOffset).toString(), + readFromVector(type.getChildren().get(1), mapVector.values, + avroSchema.getValueType(), i + mapOffset)); + } + return map; + case STRUCT: + StructColumnVector structVector = (StructColumnVector) colVector; + List<TypeDescription> children = type.getChildren(); + GenericData.Record record = new GenericData.Record(avroSchema); + for (int i = 0; i < children.size(); i++) { + record.put(i, readFromVector(children.get(i), structVector.fields[i], + avroSchema.getFields().get(i).schema(), vectorPos)); + } + return record; + case UNION: + UnionColumnVector unionVector = (UnionColumnVector) colVector; + int tag = unionVector.tags[vectorPos]; + ColumnVector fieldVector = unionVector.fields[tag]; + return readFromVector(type.getChildren().get(tag), fieldVector, avroSchema.getTypes().get(tag), vectorPos); + default: + throw new HoodieIOException("Unrecognized TypeDescription " + type.toString()); + } + } + + public static TypeDescription createOrcSchema(Schema avroSchema) { + + LogicalType logicalType = avroSchema.getLogicalType(); + + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Decimal) { + return TypeDescription.createDecimal() + .withPrecision(((LogicalTypes.Decimal) logicalType).getPrecision()) + .withScale(((LogicalTypes.Decimal) logicalType).getScale()); + } else if (logicalType instanceof LogicalTypes.Date) { + // The date logical type represents a date within the calendar, with no reference to a particular time zone + // or time of day. + // + // A date logical type annotates an Avro int, where the int stores the number of days from the unix epoch, 1 + // January 1970 (ISO calendar). + return TypeDescription.createDate(); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + // The time-millis logical type represents a time of day, with no reference to a particular calendar, time + // zone or date, with a precision of one millisecond. + // + // A time-millis logical type annotates an Avro int, where the int stores the number of milliseconds after + // midnight, 00:00:00.000. + return TypeDescription.createInt(); + } else if (logicalType instanceof LogicalTypes.TimeMicros) { + // The time-micros logical type represents a time of day, with no reference to a particular calendar, time + // zone or date, with a precision of one microsecond. + // + // A time-micros logical type annotates an Avro long, where the long stores the number of microseconds after + // midnight, 00:00:00.000000. + return TypeDescription.createLong(); + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + // The timestamp-millis logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one millisecond. + // + // A timestamp-millis logical type annotates an Avro long, where the long stores the number of milliseconds + // from the unix epoch, 1 January 1970 00:00:00.000 UTC. + return TypeDescription.createTimestamp(); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + // The timestamp-micros logical type represents an instant on the global timeline, independent of a + // particular time zone or calendar, with a precision of one microsecond. + // + // A timestamp-micros logical type annotates an Avro long, where the long stores the number of microseconds + // from the unix epoch, 1 January 1970 00:00:00.000000 UTC. + return TypeDescription.createTimestamp(); + } + } + + final Schema.Type type = avroSchema.getType(); + switch (type) { + case NULL: + // empty union represents null type + final TypeDescription nullUnion = TypeDescription.createUnion(); + return nullUnion; + case LONG: + return TypeDescription.createLong(); + case INT: + return TypeDescription.createInt(); + case BYTES: + return TypeDescription.createBinary(); + case ARRAY: + return TypeDescription.createList(createOrcSchema(avroSchema.getElementType())); + case RECORD: + final TypeDescription recordStruct = TypeDescription.createStruct(); + for (Schema.Field field : avroSchema.getFields()) { + final Schema fieldSchema = field.schema(); + final TypeDescription fieldType = createOrcSchema(fieldSchema); + if (fieldType != null) { + recordStruct.addField(field.name(), fieldType); + } + } + return recordStruct; + case MAP: + return TypeDescription.createMap( + // in Avro maps, keys are always strings + TypeDescription.createString(), + createOrcSchema(avroSchema.getValueType()) + ); + case UNION: + final List<Schema> nonNullMembers = avroSchema.getTypes().stream().filter( + schema -> !Schema.Type.NULL.equals(schema.getType()) + ).collect(Collectors.toList()); + + if (nonNullMembers.isEmpty()) { + // no non-null union members; represent as an ORC empty union + return TypeDescription.createUnion(); + } else if (nonNullMembers.size() == 1) { + // a single non-null union member + // this is how Avro represents "nullable" types; as a union of the NULL type with another + // since ORC already supports nullability of all types, just use the child type directly + return createOrcSchema(nonNullMembers.get(0)); + } else { + // more than one non-null type; represent as an actual ORC union of them + final TypeDescription union = TypeDescription.createUnion(); + for (final Schema childSchema : nonNullMembers) { + union.addUnionChild(createOrcSchema(childSchema)); + } + return union; + } + case STRING: + return TypeDescription.createString(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case BOOLEAN: + return TypeDescription.createBoolean(); + case ENUM: + // represent as String for now + return TypeDescription.createString(); + case FIXED: + return TypeDescription.createBinary(); + default: + throw new IllegalStateException(String.format("Unrecognized Avro type: %s", type.getName())); + } + } + + public static Schema createAvroSchema(TypeDescription orcSchema) { + switch (orcSchema.getCategory()) { + case BOOLEAN: + return Schema.create(Schema.Type.BOOLEAN); + case BYTE: + // tinyint (8 bit), use int to hold it + return Schema.create(Schema.Type.INT); + case SHORT: + // smallint (16 bit), use int to hold it + return Schema.create(Schema.Type.INT); + case INT: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but there is no way to distinguish + return Schema.create(Schema.Type.INT); + case LONG: + // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but there is no way to distinguish + return Schema.create(Schema.Type.LONG); + case FLOAT: + return Schema.create(Schema.Type.FLOAT); + case DOUBLE: + return Schema.create(Schema.Type.DOUBLE); + case VARCHAR: + case CHAR: + case STRING: + return Schema.create(Schema.Type.STRING); + case DATE: + Schema date = Schema.create(Schema.Type.INT); + LogicalTypes.date().addToSchema(date); + return date; + case TIMESTAMP: + // Cannot distinguish between TIMESTAMP_MILLIS and TIMESTAMP_MICROS + // Assume TIMESTAMP_MILLIS because Timestamp in ORC is in millis + Schema timestamp = Schema.create(Schema.Type.LONG); + LogicalTypes.timestampMillis().addToSchema(timestamp); + return timestamp; + case BINARY: + return Schema.create(Schema.Type.BYTES); + case DECIMAL: + Schema decimal = Schema.create(Schema.Type.BYTES); + LogicalTypes.decimal(orcSchema.getPrecision(), orcSchema.getScale()).addToSchema(decimal); + return decimal; + case LIST: + return Schema.createArray(createAvroSchema(orcSchema.getChildren().get(0))); + case MAP: + return Schema.createMap(createAvroSchema(orcSchema.getChildren().get(1))); + case STRUCT: + List<Field> childFields = new ArrayList<>(); + for (int i = 0; i < orcSchema.getChildren().size(); i++) { + TypeDescription childType = orcSchema.getChildren().get(i); + String childName = orcSchema.getFieldNames().get(i); + childFields.add(new Field(childName, createAvroSchema(childType), "", null)); + } + return Schema.createRecord(childFields); + case UNION: + return Schema.createUnion(orcSchema.getChildren().stream() + .map(AvroOrcUtils::createAvroSchema) + .collect(Collectors.toList())); + default: + throw new IllegalStateException(String.format("Unrecognized ORC type: %s", orcSchema.getCategory().getName())); + } + } + + /** + * Returns the actual schema of a field. + * + * All types in ORC is nullable whereas Avro uses a union that contains the NULL type to imply + * the nullability of an Avro type. To achieve consistency between the Avro and ORC schema, + * non-NULL types are extracted from the union type. + * @param unionSchema A schema of union type. + * @return An Avro schema that is either NULL or a UNION without NULL fields. + */ + private static Schema getActualSchemaType(Schema unionSchema) { + final List<Schema> nonNullMembers = unionSchema.getTypes().stream().filter( + schema -> !Schema.Type.NULL.equals(schema.getType()) + ).collect(Collectors.toList()); + if (nonNullMembers.isEmpty()) { + return Schema.create(Schema.Type.NULL); + } else if (nonNullMembers.size() == 1) { + return nonNullMembers.get(0); + } else { + return Schema.createUnion(nonNullMembers); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index c52d700..9b95e16 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.util; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -25,16 +26,22 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.exception.HoodieException; public abstract class BaseFileUtils { public static BaseFileUtils getInstance(String path) { if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { return new ParquetUtils(); + } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) { + return new OrcUtils(); } throw new UnsupportedOperationException("The format for file " + path + " is not supported yet."); } @@ -42,6 +49,8 @@ public abstract class BaseFileUtils { public static BaseFileUtils getInstance(HoodieFileFormat fileFormat) { if (HoodieFileFormat.PARQUET.equals(fileFormat)) { return new ParquetUtils(); + } else if (HoodieFileFormat.ORC.equals(fileFormat)) { + return new OrcUtils(); } throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } @@ -50,24 +59,122 @@ public abstract class BaseFileUtils { return getInstance(metaClient.getTableConfig().getBaseFileFormat()); } - public abstract Set<String> readRowKeys(Configuration configuration, Path filePath); - - public abstract Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter); - - public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); - - public abstract Schema readAvroSchema(Configuration configuration, Path filePath); + /** + * Read the rowKey list from the given data file. + * @param filePath The data file path + * @param configuration configuration to build fs object + * @return Set Set of row keys + */ + public Set<String> readRowKeys(Configuration configuration, Path filePath) { + return filterRowKeys(configuration, filePath, new HashSet<>()); + } - public abstract BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath); + /** + * Read the bloom filter from the metadata of the given data file. + * @param configuration Configuration + * @param filePath The data file path + * @return a BloomFilter object + */ + public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path filePath) { + Map<String, String> footerVals = + readFooter(configuration, false, filePath, + HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, + HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE); + String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + if (null == footerVal) { + // We use old style key "com.uber.hoodie.bloomfilter" + footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); + } + BloomFilter toReturn = null; + if (footerVal != null) { + if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) { + toReturn = BloomFilterFactory.fromString(footerVal, + footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)); + } else { + toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name()); + } + } + return toReturn; + } - public abstract String[] readMinMaxRecordKeys(Configuration configuration, Path filePath); + /** + * Read the min and max record key from the metadata of the given data file. + * @param configuration Configuration + * @param filePath The data file path + * @return A array of two string where the first is min record key and the second is max record key + */ + public String[] readMinMaxRecordKeys(Configuration configuration, Path filePath) { + Map<String, String> minMaxKeys = readFooter(configuration, true, filePath, + HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); + if (minMaxKeys.size() != 2) { + throw new HoodieException( + String.format("Could not read min/max record key out of footer correctly from %s. read) : %s", + filePath, minMaxKeys)); + } + return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), + minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)}; + } + /** + * Read the data file + * NOTE: This literally reads the entire file contents, thus should be used with caution. + * @param configuration Configuration + * @param filePath The data file path + * @return A list of GenericRecord + */ public abstract List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath); + /** + * Read the data file using the given schema + * NOTE: This literally reads the entire file contents, thus should be used with caution. + * @param configuration Configuration + * @param filePath The data file path + * @return A list of GenericRecord + */ public abstract List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema schema); - public abstract Map<String, String> readFooter(Configuration conf, boolean required, Path orcFilePath, - String... footerNames); + /** + * Read the footer data of the given data file. + * @param configuration Configuration + * @param required require the footer data to be in data file + * @param filePath The data file path + * @param footerNames The footer names to read + * @return A map where the key is the footer name and the value is the footer value + */ + public abstract Map<String, String> readFooter(Configuration configuration, boolean required, Path filePath, + String... footerNames); + + /** + * Returns the number of records in the data file. + * @param configuration Configuration + * @param filePath The data file path + */ + public abstract long getRowCount(Configuration configuration, Path filePath); + + /** + * Read the rowKey list matching the given filter, from the given data file. + * If the filter is empty, then this will return all the row keys. + * @param filePath The data file path + * @param configuration configuration to build fs object + * @param filter record keys filter + * @return Set Set of row keys matching candidateRecordKeys + */ + public abstract Set<String> filterRowKeys(Configuration configuration, Path filePath, Set<String> filter); + + /** + * Fetch {@link HoodieKey}s from the given data file. + * @param configuration configuration to build fs object + * @param filePath The data file path + * @return {@link List} of {@link HoodieKey}s fetched from the parquet file + */ + public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath); - public abstract long getRowCount(Configuration conf, Path filePath); -} \ No newline at end of file + /** + * Read the Avro schema of the data file. + * @param configuration Configuration + * @param filePath The data file path + * @return The Avro schema of the data file + */ + public abstract Schema readAvroSchema(Configuration configuration, Path filePath); +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java new file mode 100644 index 0000000..4b3caa7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcReaderIterator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.util.List; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.hudi.exception.HoodieIOException; + +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.util.Iterator; + +/** + * This class wraps a ORC reader and provides an iterator based api to read from an ORC file. + */ +public class OrcReaderIterator<T> implements Iterator<T> { + + private final RecordReader recordReader; + private final Schema avroSchema; + List<String> fieldNames; + List<TypeDescription> orcFieldTypes; + Schema[] avroFieldSchemas; + private VectorizedRowBatch batch; + private int rowInBatch; + private T next; + + public OrcReaderIterator(RecordReader recordReader, Schema schema, TypeDescription orcSchema) { + this.recordReader = recordReader; + this.avroSchema = schema; + this.fieldNames = orcSchema.getFieldNames(); + this.orcFieldTypes = orcSchema.getChildren(); + this.avroFieldSchemas = fieldNames.stream() + .map(fieldName -> avroSchema.getField(fieldName).schema()) + .toArray(size -> new Schema[size]); + this.batch = orcSchema.createRowBatch(); + this.rowInBatch = 0; + } + + /** + * If the current batch is empty, get a new one. + * @return true if we have rows available. + * @throws IOException + */ + private boolean ensureBatch() throws IOException { + if (rowInBatch >= batch.size) { + rowInBatch = 0; + return recordReader.nextBatch(batch); + } + return true; + } + + @Override + public boolean hasNext() { + try { + ensureBatch(); + if (this.next == null) { + this.next = (T) readRecordFromBatch(); + } + return this.next != null; + } catch (IOException io) { + throw new HoodieIOException("unable to read next record from ORC file ", io); + } + } + + @Override + public T next() { + try { + // To handle case when next() is called before hasNext() + if (this.next == null) { + if (!hasNext()) { + throw new HoodieIOException("No more records left to read from ORC file"); + } + } + T retVal = this.next; + this.next = (T) readRecordFromBatch(); + return retVal; + } catch (IOException io) { + throw new HoodieIOException("unable to read next record from ORC file ", io); + } + } + + private GenericData.Record readRecordFromBatch() throws IOException { + // No more records left to read from ORC file + if (!ensureBatch()) { + return null; + } + + GenericData.Record record = new Record(avroSchema); + int numFields = orcFieldTypes.size(); + for (int i = 0; i < numFields; i++) { + Object data = AvroOrcUtils.readFromVector(orcFieldTypes.get(i), batch.cols[i], avroFieldSchemas[i], rowInBatch); + record.put(fieldNames.get(i), data); + } + rowInBatch++; + return record; + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java new file mode 100644 index 0000000..9fc49a3 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto.UserMetadataItem; +import org.apache.orc.Reader; +import org.apache.orc.Reader.Options; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +/** + * Utility functions for ORC files. + */ +public class OrcUtils extends BaseFileUtils { + + /** + * Fetch {@link HoodieKey}s from the given ORC file. + * + * @param filePath The ORC file path. + * @param configuration configuration to build fs object + * @return {@link List} of {@link HoodieKey}s fetched from the ORC file + */ + @Override + public List<HoodieKey> fetchRecordKeyPartitionPath(Configuration configuration, Path filePath) { + List<HoodieKey> hoodieKeys = new ArrayList<>(); + try { + if (!filePath.getFileSystem(configuration).exists(filePath)) { + return new ArrayList<>(); + } + + Configuration conf = new Configuration(configuration); + conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf()); + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + + Schema readSchema = HoodieAvroUtils.getRecordKeyPartitionPathSchema(); + TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(readSchema); + List<String> fieldNames = orcSchema.getFieldNames(); + VectorizedRowBatch batch = orcSchema.createRowBatch(); + RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); + + // column indices for the RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD fields + int keyCol = -1; + int partitionCol = -1; + for (int i = 0; i < fieldNames.size(); i++) { + if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { + keyCol = i; + } + if (fieldNames.get(i).equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)) { + partitionCol = i; + } + } + if (keyCol == -1 || partitionCol == -1) { + throw new HoodieException(String.format("Couldn't find row keys or partition path in %s.", filePath)); + } + while (recordReader.nextBatch(batch)) { + BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[keyCol]; + BytesColumnVector partitionPaths = (BytesColumnVector) batch.cols[partitionCol]; + for (int i = 0; i < batch.size; i++) { + String rowKey = rowKeys.toString(i); + String partitionPath = partitionPaths.toString(i); + hoodieKeys.add(new HoodieKey(rowKey, partitionPath)); + } + } + } catch (IOException e) { + throw new HoodieIOException("Failed to read from ORC file:" + filePath, e); + } + return hoodieKeys; + } + + /** + * NOTE: This literally reads the entire file contents, thus should be used with caution. + */ + @Override + public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath) { + Schema avroSchema; + try { + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + avroSchema = AvroOrcUtils.createAvroSchema(reader.getSchema()); + } catch (IOException io) { + throw new HoodieIOException("Unable to read Avro records from an ORC file:" + filePath, io); + } + return readAvroRecords(configuration, filePath, avroSchema); + } + + /** + * NOTE: This literally reads the entire file contents, thus should be used with caution. + */ + @Override + public List<GenericRecord> readAvroRecords(Configuration configuration, Path filePath, Schema avroSchema) { + List<GenericRecord> records = new ArrayList<>(); + try { + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(configuration)); + TypeDescription orcSchema = reader.getSchema(); + RecordReader recordReader = reader.rows(new Options(configuration).schema(orcSchema)); + OrcReaderIterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); + while (iterator.hasNext()) { + GenericRecord record = iterator.next(); + records.add(record); + } + } catch (IOException io) { + throw new HoodieIOException("Unable to create an ORC reader for ORC file:" + filePath, io); + } + return records; + } + + /** + * Read the rowKey list matching the given filter, from the given ORC file. If the filter is empty, then this will + * return all the rowkeys. + * + * @param conf configuration to build fs object. + * @param filePath The ORC file path. + * @param filter record keys filter + * @return Set Set of row keys matching candidateRecordKeys + */ + @Override + public Set<String> filterRowKeys(Configuration conf, Path filePath, Set<String> filter) + throws HoodieIOException { + try { + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + Set<String> filteredRowKeys = new HashSet<>(); + TypeDescription schema = reader.getSchema(); + List<String> fieldNames = schema.getFieldNames(); + VectorizedRowBatch batch = schema.createRowBatch(); + RecordReader recordReader = reader.rows(new Options(conf).schema(schema)); + + // column index for the RECORD_KEY_METADATA_FIELD field + int colIndex = -1; + for (int i = 0; i < fieldNames.size(); i++) { + if (fieldNames.get(i).equals(HoodieRecord.RECORD_KEY_METADATA_FIELD)) { + colIndex = i; + break; + } + } + if (colIndex == -1) { + throw new HoodieException(String.format("Couldn't find row keys in %s.", filePath)); + } + while (recordReader.nextBatch(batch)) { + BytesColumnVector rowKeys = (BytesColumnVector) batch.cols[colIndex]; + for (int i = 0; i < batch.size; i++) { + String rowKey = rowKeys.toString(i); + if (filter.isEmpty() || filter.contains(rowKey)) { + filteredRowKeys.add(rowKey); + } + } + } + return filteredRowKeys; + } catch (IOException io) { + throw new HoodieIOException("Unable to read row keys for ORC file:" + filePath, io); + } + } + + @Override + public Map<String, String> readFooter(Configuration conf, boolean required, + Path orcFilePath, String... footerNames) { + try { + Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + Map<String, String> footerVals = new HashMap<>(); + List<UserMetadataItem> metadataItemList = reader.getFileTail().getFooter().getMetadataList(); + Map<String, String> metadata = metadataItemList.stream().collect(Collectors.toMap( + UserMetadataItem::getName, + metadataItem -> metadataItem.getValue().toStringUtf8())); + for (String footerName : footerNames) { + if (metadata.containsKey(footerName)) { + footerVals.put(footerName, metadata.get(footerName)); + } else if (required) { + throw new MetadataNotFoundException( + "Could not find index in ORC footer. Looked for key " + footerName + " in " + + orcFilePath); + } + } + return footerVals; + } catch (IOException io) { + throw new HoodieIOException("Unable to read footer for ORC file:" + orcFilePath, io); + } + } + + @Override + public Schema readAvroSchema(Configuration conf, Path orcFilePath) { + try { + Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + TypeDescription orcSchema = reader.getSchema(); + return AvroOrcUtils.createAvroSchema(orcSchema); + } catch (IOException io) { + throw new HoodieIOException("Unable to get Avro schema for ORC file:" + orcFilePath, io); + } + } + + @Override + public long getRowCount(Configuration conf, Path orcFilePath) { + try { + Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(conf)); + return reader.getNumberOfRows(); + } catch (IOException io) { + throw new HoodieIOException("Unable to get row count for ORC file:" + orcFilePath, io); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index c7b3a3f..bd44724 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -19,14 +19,9 @@ package org.apache.hudi.common.util; import org.apache.hudi.avro.HoodieAvroUtils; -import org.apache.hudi.avro.HoodieAvroWriteSupport; -import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.bloom.BloomFilterFactory; -import org.apache.hudi.common.bloom.BloomFilterTypeCode; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.MetadataNotFoundException; @@ -58,18 +53,6 @@ import java.util.function.Function; public class ParquetUtils extends BaseFileUtils { /** - * Read the rowKey list from the given parquet file. - * - * @param filePath The parquet file path. - * @param configuration configuration to build fs object - * @return Set Set of row keys - */ - @Override - public Set<String> readRowKeys(Configuration configuration, Path filePath) { - return filterRowKeys(configuration, filePath, new HashSet<>()); - } - - /** * Read the rowKey list matching the given filter, from the given parquet file. If the filter is empty, then this will * return all the rowkeys. * @@ -196,47 +179,8 @@ public class ParquetUtils extends BaseFileUtils { @Override public Schema readAvroSchema(Configuration configuration, Path parquetFilePath) { - return new AvroSchemaConverter(configuration).convert(readSchema(configuration, parquetFilePath)); - } - - /** - * Read out the bloom filter from the parquet file meta data. - */ - @Override - public BloomFilter readBloomFilterFromMetadata(Configuration configuration, Path parquetFilePath) { - Map<String, String> footerVals = - readFooter(configuration, false, parquetFilePath, - HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, - HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY, - HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE); - String footerVal = footerVals.get(HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); - if (null == footerVal) { - // We use old style key "com.uber.hoodie.bloomfilter" - footerVal = footerVals.get(HoodieAvroWriteSupport.OLD_HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY); - } - BloomFilter toReturn = null; - if (footerVal != null) { - if (footerVals.containsKey(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)) { - toReturn = BloomFilterFactory.fromString(footerVal, - footerVals.get(HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE)); - } else { - toReturn = BloomFilterFactory.fromString(footerVal, BloomFilterTypeCode.SIMPLE.name()); - } - } - return toReturn; - } - - @Override - public String[] readMinMaxRecordKeys(Configuration configuration, Path parquetFilePath) { - Map<String, String> minMaxKeys = readFooter(configuration, true, parquetFilePath, - HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER, HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER); - if (minMaxKeys.size() != 2) { - throw new HoodieException( - String.format("Could not read min/max record key out of footer correctly from %s. read) : %s", - parquetFilePath, minMaxKeys)); - } - return new String[] {minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER), - minMaxKeys.get(HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER)}; + MessageType parquetSchema = readSchema(configuration, parquetFilePath); + return new AvroSchemaConverter(configuration).convert(parquetSchema); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java index ff559c5..f913df7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import java.io.IOException; +import static org.apache.hudi.common.model.HoodieFileFormat.ORC; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.common.model.HoodieFileFormat.HFILE; @@ -40,6 +41,9 @@ public class HoodieFileReaderFactory { if (HFILE.getFileExtension().equals(extension)) { return newHFileFileReader(conf, path); } + if (ORC.getFileExtension().equals(extension)) { + return newOrcFileReader(conf, path); + } throw new UnsupportedOperationException(extension + " format not supported yet."); } @@ -52,4 +56,8 @@ public class HoodieFileReaderFactory { CacheConfig cacheConfig = new CacheConfig(conf); return new HoodieHFileReader<>(conf, path, cacheConfig); } + + private static <R extends IndexedRecord> HoodieFileReader<R> newOrcFileReader(Configuration conf, Path path) { + return new HoodieOrcReader<>(conf, path); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java new file mode 100644 index 0000000..319f8d7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcReader.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Set; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.util.AvroOrcUtils; +import org.apache.hudi.common.util.BaseFileUtils; +import org.apache.hudi.common.util.OrcReaderIterator; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.Reader.Options; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +public class HoodieOrcReader<R extends IndexedRecord> implements HoodieFileReader { + private Path path; + private Configuration conf; + private final BaseFileUtils orcUtils; + + public HoodieOrcReader(Configuration configuration, Path path) { + this.conf = configuration; + this.path = path; + this.orcUtils = BaseFileUtils.getInstance(HoodieFileFormat.ORC); + } + + @Override + public String[] readMinMaxRecordKeys() { + return orcUtils.readMinMaxRecordKeys(conf, path); + } + + @Override + public BloomFilter readBloomFilter() { + return orcUtils.readBloomFilterFromMetadata(conf, path); + } + + @Override + public Set<String> filterRowKeys(Set candidateRowKeys) { + return orcUtils.filterRowKeys(conf, path, candidateRowKeys); + } + + @Override + public Iterator<R> getRecordIterator(Schema schema) throws IOException { + try { + Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); + TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(schema); + RecordReader recordReader = reader.rows(new Options(conf).schema(orcSchema)); + return new OrcReaderIterator(recordReader, schema, orcSchema); + } catch (IOException io) { + throw new HoodieIOException("Unable to create an ORC reader.", io); + } + } + + @Override + public Schema getSchema() { + return orcUtils.readAvroSchema(conf, path); + } + + @Override + public void close() { + } + + @Override + public long getTotalRecords() { + return orcUtils.getRowCount(conf, path); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java new file mode 100644 index 0000000..b775a37 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroOrcUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA; + +import java.util.Arrays; +import java.util.List; +import org.apache.avro.Schema; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.orc.TypeDescription; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestAvroOrcUtils extends HoodieCommonTestHarness { + + public static List<Arguments> testCreateOrcSchemaArgs() { + // the ORC schema is constructed in the order as AVRO_SCHEMA: + // TRIP_SCHEMA_PREFIX, EXTRA_TYPE_SCHEMA, MAP_TYPE_SCHEMA, FARE_NESTED_SCHEMA, TIP_NESTED_SCHEMA, TRIP_SCHEMA_SUFFIX + // The following types are tested: + // DATE, DECIMAL, LONG, INT, BYTES, ARRAY, RECORD, MAP, STRING, FLOAT, DOUBLE + TypeDescription orcSchema = TypeDescription.fromString("struct<" + + "timestamp:bigint,_row_key:string,rider:string,driver:string,begin_lat:double," + + "begin_lon:double,end_lat:double,end_lon:double," + + "distance_in_meters:int,seconds_since_epoch:bigint,weight:float,nation:binary," + + "current_date:date,current_ts:bigint,height:decimal(10,6)," + + "city_to_state:map<string,string>," + + "fare:struct<amount:double,currency:string>," + + "tip_history:array<struct<amount:double,currency:string>>," + + "_hoodie_is_deleted:boolean>"); + + // Tests the types FIXED, UNION + String structField = "{\"type\":\"record\", \"name\":\"fare\",\"fields\": " + + "[{\"name\": \"amount\",\"type\": \"double\"},{\"name\": \"currency\", \"type\": \"string\"}]}"; + Schema avroSchemaWithMoreTypes = new Schema.Parser().parse( + "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ " + + "{\"name\" : \"age\", \"type\":{\"type\": \"fixed\", \"size\": 16, \"name\": \"fixedField\" }}," + + "{\"name\" : \"height\", \"type\": [\"int\", \"null\"] }," + + "{\"name\" : \"id\", \"type\": [\"int\", \"string\"] }," + + "{\"name\" : \"fare\", \"type\": [" + structField + ", \"null\"] }]}"); + TypeDescription orcSchemaWithMoreTypes = TypeDescription.fromString( + "struct<age:binary,height:int,id:uniontype<int,string>,fare:struct<amount:double,currency:string>>"); + + return Arrays.asList( + Arguments.of(AVRO_SCHEMA, orcSchema), + Arguments.of(avroSchemaWithMoreTypes, orcSchemaWithMoreTypes) + ); + } + + @ParameterizedTest + @MethodSource("testCreateOrcSchemaArgs") + public void testCreateOrcSchema(Schema avroSchema, TypeDescription orcSchema) { + TypeDescription convertedSchema = AvroOrcUtils.createOrcSchema(avroSchema); + assertEquals(orcSchema, convertedSchema); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java new file mode 100644 index 0000000..b55995c --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestOrcReaderIterator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; + +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestOrcReaderIterator { + private final Path filePath = new Path(System.getProperty("java.io.tmpdir") + "/f1_1-0-1_000.orc"); + + @BeforeEach + @AfterEach + public void clearTempFile() { + File file = new File(filePath.toString()); + if (file.exists()) { + file.delete(); + } + } + + @Test + public void testOrcIteratorReadData() throws Exception { + final Configuration conf = new Configuration(); + Schema avroSchema = getSchemaFromResource(TestOrcReaderIterator.class, "/simple-test.avsc"); + TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema); + OrcFile.WriterOptions options = OrcFile.writerOptions(conf).setSchema(orcSchema).compress(CompressionKind.ZLIB); + Writer writer = OrcFile.createWriter(filePath, options); + VectorizedRowBatch batch = orcSchema.createRowBatch(); + BytesColumnVector nameColumns = (BytesColumnVector) batch.cols[0]; + LongColumnVector numberColumns = (LongColumnVector) batch.cols[1]; + BytesColumnVector colorColumns = (BytesColumnVector) batch.cols[2]; + for (int r = 0; r < 5; ++r) { + int row = batch.size++; + byte[] name = ("name" + r).getBytes(StandardCharsets.UTF_8); + nameColumns.setVal(row, name); + byte[] color = ("color" + r).getBytes(StandardCharsets.UTF_8); + colorColumns.setVal(row, color); + numberColumns.vector[row] = r; + } + writer.addRowBatch(batch); + writer.close(); + + Reader reader = OrcFile.createReader(filePath, OrcFile.readerOptions(conf)); + RecordReader recordReader = reader.rows(new Reader.Options(conf).schema(orcSchema)); + Iterator<GenericRecord> iterator = new OrcReaderIterator<>(recordReader, avroSchema, orcSchema); + int recordCount = 0; + while (iterator.hasNext()) { + GenericRecord record = iterator.next(); + assertEquals("name" + recordCount, record.get("name").toString()); + assertEquals("color" + recordCount, record.get("favorite_color").toString()); + assertEquals(recordCount, record.get("favorite_number")); + recordCount++; + } + assertEquals(5, recordCount); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java index 13971d5..ec334bd 100644 --- a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java +++ b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieFileReaderFactory.java @@ -44,11 +44,16 @@ public class TestHoodieFileReaderFactory { HoodieFileReader<IndexedRecord> parquetReader = HoodieFileReaderFactory.getFileReader(hadoopConf, parquetPath); assertTrue(parquetReader instanceof HoodieParquetReader); - // other file format exception. + // log file format. final Path logPath = new Path("/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1"); final Throwable thrown = assertThrows(UnsupportedOperationException.class, () -> { HoodieFileReader<IndexedRecord> logWriter = HoodieFileReaderFactory.getFileReader(hadoopConf, logPath); }, "should fail since log storage reader is not supported yet."); assertTrue(thrown.getMessage().contains("format not supported yet.")); + + // Orc file format. + final Path orcPath = new Path("/partition/path/f1_1-0-1_000.orc"); + HoodieFileReader<IndexedRecord> orcReader = HoodieFileReaderFactory.getFileReader(hadoopConf, orcPath); + assertTrue(orcReader instanceof HoodieOrcReader); } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index e49d012..b39ee34 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -18,6 +18,9 @@ package org.apache.hudi.hadoop.utils; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -123,6 +126,8 @@ public class HoodieInputFormatUtils { } else { return HoodieHFileInputFormat.class.getName(); } + case ORC: + return OrcInputFormat.class.getName(); default: throw new HoodieIOException("Hoodie InputFormat not implemented for base file format " + baseFileFormat); } @@ -134,6 +139,8 @@ public class HoodieInputFormatUtils { return MapredParquetOutputFormat.class.getName(); case HFILE: return MapredParquetOutputFormat.class.getName(); + case ORC: + return OrcOutputFormat.class.getName(); default: throw new HoodieIOException("No OutputFormat for base file format " + baseFileFormat); } @@ -145,6 +152,8 @@ public class HoodieInputFormatUtils { return ParquetHiveSerDe.class.getName(); case HFILE: return ParquetHiveSerDe.class.getName(); + case ORC: + return OrcSerde.class.getName(); default: throw new HoodieIOException("No SerDe for base file format " + baseFileFormat); } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 0b8234d..32bd9a4 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -19,7 +19,7 @@ package org.apache.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} @@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.hudi.streaming.HoodieStreamSource @@ -186,6 +187,10 @@ class DefaultSource extends RelationProvider extraReadPaths: Seq[String], metaClient: HoodieTableMetaClient): BaseRelation = { log.info("Loading Base File Only View with options :" + optParams) + val (tableFileFormat, formatClassName) = metaClient.getTableConfig.getBaseFileFormat match { + case HoodieFileFormat.PARQUET => (new ParquetFileFormat, "parquet") + case HoodieFileFormat.ORC => (new OrcFileFormat, "orc") + } if (useHoodieFileIndex) { @@ -198,7 +203,7 @@ class DefaultSource extends RelationProvider fileIndex.partitionSchema, fileIndex.dataSchema, bucketSpec = None, - fileFormat = new ParquetFileFormat, + fileFormat = tableFileFormat, optParams)(sqlContext.sparkSession) } else { // this is just effectively RO view only, where `path` can contain a mix of @@ -208,12 +213,12 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) - // simply return as a regular parquet relation + // simply return as a regular relation DataSource.apply( sparkSession = sqlContext.sparkSession, paths = extraReadPaths, userSpecifiedSchema = Option(schema), - className = "parquet", + className = formatClassName, options = optParams) .resolveRelation() } diff --git a/pom.xml b/pom.xml index a8054c2..c1b4a99 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,8 @@ <hive.version>2.3.1</hive.version> <hive.exec.classifier>core</hive.exec.classifier> <metrics.version>4.1.1</metrics.version> + <orc.version>1.6.0</orc.version> + <airlift.version>0.16</airlift.version> <prometheus.version>0.8.0</prometheus.version> <http.version>4.4.1</http.version> <spark.version>${spark2.version}</spark.version>