This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 363a69dad3c2 [FLINK-36652][Formats/ORC] Upgrade Apache ORC Version to 1.9.4 (#25711) 363a69dad3c2 is described below commit 363a69dad3c22c9b7063f4eb9ba9ca17b930432a Author: mzzx <33508807+dycc...@users.noreply.github.com> AuthorDate: Mon Jan 20 20:29:02 2025 +0800 [FLINK-36652][Formats/ORC] Upgrade Apache ORC Version to 1.9.4 (#25711) --- flink-formats/flink-orc-nohive/pom.xml | 70 ++ .../nohive/writer/NoHivePhysicalWriterImpl.java | 2 +- .../OrcColumnarRowSplitReaderNoHiveTest.java | 4 +- flink-formats/flink-orc/pom.xml | 36 +- .../flink/orc/writer/OrcBulkWriterFactory.java | 23 +- .../flink/orc/writer/PhysicalWriterImpl.java | 741 +++++++++++++++------ .../flink/orc/OrcColumnarRowInputFormatTest.java | 34 +- .../flink/orc/OrcColumnarRowSplitReaderTest.java | 53 +- .../flink/orc/OrcFormatStatisticsReportTest.java | 4 +- .../src/main/resources/META-INF/NOTICE | 10 +- pom.xml | 3 +- 11 files changed, 734 insertions(+), 246 deletions(-) diff --git a/flink-formats/flink-orc-nohive/pom.xml b/flink-formats/flink-orc-nohive/pom.xml index e623f8cd31e6..0e3c9f70b90b 100644 --- a/flink-formats/flink-orc-nohive/pom.xml +++ b/flink-formats/flink-orc-nohive/pom.xml @@ -87,6 +87,76 @@ under the License. <groupId>org.slf4j</groupId> <artifactId>slf4j-reload4j</artifactId> </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protoc.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <version>${storage-api.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.reload4j</groupId> + <artifactId>reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>ch.qos.reload4j</groupId> + <artifactId>reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> </exclusions> </dependency> diff --git a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java index 0734e9bf90bc..30c417111cb3 100644 --- a/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java +++ b/flink-formats/flink-orc-nohive/src/main/java/org/apache/flink/orc/nohive/writer/NoHivePhysicalWriterImpl.java @@ -21,9 +21,9 @@ package org.apache.flink.orc.nohive.writer; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.orc.writer.PhysicalWriterImpl; -import com.google.protobuf25.CodedOutputStream; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; +import org.apache.orc.protobuf.CodedOutputStream; import java.io.IOException; diff --git a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java index 425ae3105a3c..de2f4fd8662f 100644 --- a/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java +++ b/flink-formats/flink-orc-nohive/src/test/java/org/apache/flink/orc/nohive/OrcColumnarRowSplitReaderNoHiveTest.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Map; -import java.util.stream.IntStream; /** Test for {@link OrcColumnarRowSplitReader}. */ class OrcColumnarRowSplitReaderNoHiveTest extends OrcColumnarRowSplitReaderTest { @@ -100,12 +99,13 @@ class OrcColumnarRowSplitReaderNoHiveTest extends OrcColumnarRowSplitReaderTest protected OrcColumnarRowSplitReader createReader( int[] selectedFields, DataType[] fullTypes, + String[] fullNames, Map<String, Object> partitionSpec, FileInputSplit split) throws IOException { return OrcNoHiveSplitReaderUtil.genPartColumnarRowReader( new Configuration(), - IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), + fullNames, fullTypes, partitionSpec, selectedFields, diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index 3f92726ba62c..d5176a8a036c 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -101,7 +101,11 @@ under the License. </exclusion> <exclusion> <groupId>org.slf4j</groupId> - <artifactId>slf4j-reload4j</artifactId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> </exclusion> </exclusions> </dependency> @@ -125,6 +129,10 @@ under the License. <groupId>org.slf4j</groupId> <artifactId>slf4j-reload4j</artifactId> </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> </exclusions> </dependency> @@ -141,6 +149,32 @@ under the License. <groupId>org.slf4j</groupId> <artifactId>slf4j-reload4j</artifactId> </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protoc.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <version>${storage-api.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> </exclusions> </dependency> diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java index 5e4310107ab4..010deb824a09 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/OrcBulkWriterFactory.java @@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.orc.OrcFile; import org.apache.orc.impl.WriterImpl; +import org.apache.orc.impl.writer.WriterEncryptionVariant; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -96,14 +98,29 @@ public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> { @Override public BulkWriter<T> create(FSDataOutputStream out) throws IOException { OrcFile.WriterOptions opts = getWriterOptions(); - opts.physicalWriter(new PhysicalWriterImpl(out, opts)); - + PhysicalWriterImpl physicalWriter = new PhysicalWriterImpl(out, opts); + opts.physicalWriter(physicalWriter); // The path of the Writer is not used to indicate the destination file // in this case since we have used a dedicated physical writer to write // to the give output stream directly. However, the path would be used as // the key of writer in the ORC memory manager, thus we need to make it unique. Path unusedPath = new Path(UUID.randomUUID().toString()); - return new OrcBulkWriter<>(vectorizer, new WriterImpl(null, unusedPath, opts)); + WriterImpl writer = new WriterImpl(null, unusedPath, opts); + + // Obtaining encryption variant from Writer, and setting encryption variant for + // physicalWriter. + try { + Field encryptionFiled = WriterImpl.class.getDeclaredField("encryption"); + encryptionFiled.setAccessible(true); + WriterEncryptionVariant[] encryption = + (WriterEncryptionVariant[]) encryptionFiled.get(writer); + physicalWriter.setEncryptionVariant(encryption); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException( + "Can not access to the encryption field in Class org.apache.orc.impl.WriterImpl", + e); + } + return new OrcBulkWriter<>(vectorizer, writer); } @VisibleForTesting diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java index 56e16c99c0a3..7dd8b3789649 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/writer/PhysicalWriterImpl.java @@ -21,16 +21,24 @@ package org.apache.flink.orc.writer; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.FSDataOutputStream; +import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import org.apache.orc.CompressionCodec; -import org.apache.orc.CompressionKind; +import org.apache.orc.EncryptionVariant; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; import org.apache.orc.PhysicalWriter; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.CryptoUtils; import org.apache.orc.impl.HadoopShims; import org.apache.orc.impl.OrcCodecPool; import org.apache.orc.impl.OutStream; +import org.apache.orc.impl.SerializationUtils; import org.apache.orc.impl.StreamName; +import org.apache.orc.impl.WriterImpl; +import org.apache.orc.impl.writer.StreamOptions; +import org.apache.orc.impl.writer.WriterEncryptionKey; +import org.apache.orc.impl.writer.WriterEncryptionVariant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,12 +46,11 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; -import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize; - /** * A slightly customised clone of {@link org.apache.orc.impl.PhysicalFsWriter}. * @@ -55,247 +62,227 @@ import static org.apache.orc.impl.WriterImpl.getEstimatedBufferSize; */ @Internal public class PhysicalWriterImpl implements PhysicalWriter { - private static final Logger LOG = LoggerFactory.getLogger(PhysicalWriterImpl.class); - private static final byte[] ZEROS = new byte[64 * 1024]; + private static final int HDFS_BUFFER_SIZE = 256 * 1024; + private static final byte[] ZEROS = new byte[64 * 1024]; + + private FSDataOutputStream rawWriter; + private final DirectStream rawStream; protected final OutStream writer; - private final CodedOutputStream protobufWriter; - private final CompressionKind compress; - private final Map<StreamName, BufferedStream> streams; + private final CodedOutputStream codedCompressStream; + private final HadoopShims shims; - private final int maxPadding; - private final int bufferSize; private final long blockSize; + private final int maxPadding; + private final StreamOptions compress; + private final OrcFile.CompressionStrategy compressionStrategy; private final boolean addBlockPadding; private final boolean writeVariableLengthBlocks; + private final VariantTracker unencrypted; - private CompressionCodec codec; - private FSDataOutputStream out; private long headerLength; private long stripeStart; + private long blockOffset; private int metadataLength; + private int stripeStatisticsLength = 0; private int footerLength; + private int stripeNumber = 0; + + private final Map<WriterEncryptionVariant, VariantTracker> variants = new TreeMap<>(); public PhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException { + this(out, opts, new WriterEncryptionVariant[0]); + } + + public PhysicalWriterImpl( + FSDataOutputStream out, + OrcFile.WriterOptions opts, + WriterEncryptionVariant[] encryption) + throws IOException { + this.rawWriter = out; + long defaultStripeSize = opts.getStripeSize(); + this.addBlockPadding = opts.getBlockPadding(); if (opts.isEnforceBufferSize()) { - this.bufferSize = opts.getBufferSize(); + this.compress = new StreamOptions(opts.getBufferSize()); } else { - this.bufferSize = - getEstimatedBufferSize( - opts.getStripeSize(), - opts.getSchema().getMaximumId() + 1, - opts.getBufferSize()); + this.compress = + new StreamOptions( + WriterImpl.getEstimatedBufferSize( + defaultStripeSize, + opts.getSchema().getMaximumId() + 1, + opts.getBufferSize())); } - - this.out = out; - this.blockOffset = 0; + CompressionCodec codec = OrcCodecPool.getCodec(opts.getCompress()); + if (codec != null) { + compress.withCodec(codec, codec.getDefaultOptions()); + } + this.compressionStrategy = opts.getCompressionStrategy(); + this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize); this.blockSize = opts.getBlockSize(); - this.maxPadding = (int) (opts.getPaddingTolerance() * (double) opts.getBufferSize()); - this.compress = opts.getCompress(); - this.codec = OrcCodecPool.getCodec(this.compress); - this.streams = new TreeMap<>(); - this.writer = - new OutStream("metadata", this.bufferSize, this.codec, new DirectStream(this.out)); - this.shims = opts.getHadoopShims(); - this.addBlockPadding = opts.getBlockPadding(); - this.protobufWriter = CodedOutputStream.newInstance(this.writer); - this.writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); + blockOffset = 0; + unencrypted = new VariantTracker(opts.getSchema(), compress); + writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); + shims = opts.getHadoopShims(); + rawStream = new DirectStream(rawWriter); + writer = new OutStream("stripe footer", compress, rawStream); + codedCompressStream = CodedOutputStream.newInstance(writer); + for (WriterEncryptionVariant variant : encryption) { + WriterEncryptionKey key = variant.getKeyDescription(); + StreamOptions encryptOptions = + new StreamOptions(unencrypted.options) + .withEncryption(key.getAlgorithm(), variant.getFileFooterKey()); + variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions)); + } } - @Override - public void writeHeader() throws IOException { - this.out.write("ORC".getBytes()); - this.headerLength = this.out.getPos(); + public void setEncryptionVariant(WriterEncryptionVariant[] encryption) { + if (encryption == null) { + return; + } + for (WriterEncryptionVariant variant : encryption) { + WriterEncryptionKey key = variant.getKeyDescription(); + StreamOptions encryptOptions = + new StreamOptions(unencrypted.options) + .withEncryption(key.getAlgorithm(), variant.getFileFooterKey()); + variants.put(variant, new VariantTracker(variant.getRoot(), encryptOptions)); + } } - @Override - public OutputReceiver createDataStream(StreamName name) throws IOException { - BufferedStream result = streams.get(name); + protected static class VariantTracker { + // the streams that make up the current stripe + protected final Map<StreamName, BufferedStream> streams = new TreeMap<>(); + private final int rootColumn; + private final int lastColumn; + protected final StreamOptions options; + // a list for each column covered by this variant + // the elements in the list correspond to each stripe in the file + protected final List<OrcProto.ColumnStatistics>[] stripeStats; + protected final List<OrcProto.Stream> stripeStatsStreams = new ArrayList<>(); + protected final OrcProto.ColumnStatistics[] fileStats; + + VariantTracker(TypeDescription schema, StreamOptions options) { + rootColumn = schema.getId(); + lastColumn = schema.getMaximumId(); + this.options = options; + stripeStats = new List[schema.getMaximumId() - schema.getId() + 1]; + for (int i = 0; i < stripeStats.length; ++i) { + stripeStats[i] = new ArrayList<>(); + } + fileStats = new OrcProto.ColumnStatistics[stripeStats.length]; + } - if (result == null) { - result = new BufferedStream(); + public BufferedStream createStream(StreamName name) { + BufferedStream result = new BufferedStream(); streams.put(name, result); + return result; } - return result; - } - - @Override - public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) - throws IOException { - OutputStream stream = - new OutStream(this.toString(), bufferSize, codec, createDataStream(name)); - index.build().writeTo(stream); - stream.flush(); - } - - @Override - public void writeBloomFilter( - StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) - throws IOException { - OutputStream stream = - new OutStream(this.toString(), bufferSize, codec, createDataStream(name)); - bloom.build().writeTo(stream); - stream.flush(); - } - - @Override - public void finalizeStripe( - OrcProto.StripeFooter.Builder footerBuilder, - OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - long indexSize = 0; - long dataSize = 0; - - for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) { - BufferedStream receiver = pair.getValue(); - if (!receiver.isSuppressed) { - long streamSize = receiver.getOutputSize(); - StreamName name = pair.getKey(); - footerBuilder.addStreams( - OrcProto.Stream.newBuilder() - .setColumn(name.getColumn()) - .setKind(name.getKind()) - .setLength(streamSize)); - if (StreamName.Area.INDEX == name.getArea()) { - indexSize += streamSize; - } else { - dataSize += streamSize; + /** + * Place the streams in the appropriate area while updating the sizes with the number of + * bytes in the area. + * + * @param area the area to write + * @param sizes the sizes of the areas + * @return the list of stream descriptions to add + */ + public List<OrcProto.Stream> placeStreams(StreamName.Area area, SizeCounters sizes) { + List<OrcProto.Stream> result = new ArrayList<>(streams.size()); + for (Map.Entry<StreamName, BufferedStream> stream : streams.entrySet()) { + StreamName name = stream.getKey(); + BufferedStream bytes = stream.getValue(); + if (name.getArea() == area && !bytes.isSuppressed) { + OrcProto.Stream.Builder builder = OrcProto.Stream.newBuilder(); + long size = bytes.getOutputSize(); + if (area == StreamName.Area.INDEX) { + sizes.index += size; + } else { + sizes.data += size; + } + builder.setColumn(name.getColumn()).setKind(name.getKind()).setLength(size); + result.add(builder.build()); } } + return result; } - dirEntry.setIndexLength(indexSize).setDataLength(dataSize); - OrcProto.StripeFooter footer = footerBuilder.build(); - // Do we need to pad the file so the stripe doesn't straddle a block boundary? - padStripe(indexSize + dataSize + footer.getSerializedSize()); - - // write out the data streams - for (Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) { - pair.getValue().spillToDiskAndClear(out); + /** + * Write the streams in the appropriate area. + * + * @param area the area to write + * @param raw the raw stream to write to + */ + public void writeStreams(StreamName.Area area, FSDataOutputStream raw) throws IOException { + for (Map.Entry<StreamName, BufferedStream> stream : streams.entrySet()) { + if (stream.getKey().getArea() == area) { + stream.getValue().spillToDiskAndClear(raw); + } + } } - // Write out the footer. - writeStripeFooter(footer, dataSize, indexSize, dirEntry); - } - - @Override - public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { - long startPosition = out.getPos(); - OrcProto.Metadata metadata = builder.build(); - writeMetadata(metadata); - this.metadataLength = (int) (out.getPos() - startPosition); - } - - @Override - public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { - long bodyLength = out.getPos() - metadataLength; - builder.setContentLength(bodyLength); - builder.setHeaderLength(headerLength); - long startPosition = out.getPos(); - OrcProto.Footer footer = builder.build(); - writeFileFooter(footer); - this.footerLength = (int) (out.getPos() - startPosition); - } - - @Override - public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { - builder.setFooterLength(footerLength); - builder.setMetadataLength(metadataLength); - - OrcProto.PostScript ps = builder.build(); - // need to write this uncompressed - long startPosition = out.getPos(); - ps.writeTo(out); - long length = out.getPos() - startPosition; - - if (length > 255) { - throw new IllegalArgumentException("PostScript too large at " + length); + /** + * Computed the size of the given column on disk for this stripe. It excludes the index + * streams. + * + * @param column a column id + * @return the total number of bytes + */ + public long getFileBytes(int column) { + long result = 0; + if (column >= rootColumn && column <= lastColumn) { + for (Map.Entry<StreamName, BufferedStream> entry : streams.entrySet()) { + StreamName name = entry.getKey(); + if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) { + result += entry.getValue().getOutputSize(); + } + } + } + return result; } - - out.write((int) length); - return out.getPos(); - } - - @Override - public void close() { - // Just release the codec but don't close the internal stream here to avoid - // Stream Closed or ClosedChannelException when Flink performs checkpoint. - OrcCodecPool.returnCodec(compress, codec); - codec = null; } - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry) - throws IOException { - long start = out.getPos(); - int length = buffer.remaining(); - long availBlockSpace = blockSize - (start % blockSize); - - // see if stripe can fit in the current hdfs block, else pad the remaining - // space in the block - if (length < blockSize && length > availBlockSpace && addBlockPadding) { - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; - LOG.info(String.format("Padding ORC by %d bytes while merging..", availBlockSpace)); - start += availBlockSpace; - while (availBlockSpace > 0) { - int writeLen = (int) Math.min(availBlockSpace, pad.length); - out.write(pad, 0, writeLen); - availBlockSpace -= writeLen; - } + VariantTracker getVariant(EncryptionVariant column) { + if (column == null) { + return unencrypted; } - - out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length); - dirEntry.setOffset(start); + return variants.get(column); } @Override - public CompressionCodec getCompressionCodec() { - return this.codec; + public long getFileBytes(int column, WriterEncryptionVariant variant) { + return getVariant(variant).getFileBytes(column); } @Override - public long getFileBytes(int column) { - long size = 0; - - for (final Map.Entry<StreamName, BufferedStream> pair : streams.entrySet()) { - final BufferedStream receiver = pair.getValue(); - if (!receiver.isSuppressed) { + public StreamOptions getStreamOptions() { + return unencrypted.options; + } - final StreamName name = pair.getKey(); - if (name.getColumn() == column && name.getArea() != StreamName.Area.INDEX) { - size += receiver.getOutputSize(); - } - } + private static void writeZeros(OutputStream output, long remaining) throws IOException { + while (remaining > 0) { + long size = Math.min(ZEROS.length, remaining); + output.write(ZEROS, 0, (int) size); + remaining -= size; } - - return size; } private void padStripe(long stripeSize) throws IOException { - this.stripeStart = out.getPos(); + this.stripeStart = rawWriter.getPos(); long previousBytesInBlock = (stripeStart - blockOffset) % blockSize; - // We only have options if this isn't the first stripe in the block if (previousBytesInBlock > 0) { if (previousBytesInBlock + stripeSize >= blockSize) { // Try making a short block - if (writeVariableLengthBlocks && shims.endVariableLengthBlock(out)) { + if (writeVariableLengthBlocks && shims.endVariableLengthBlock(rawWriter)) { blockOffset = stripeStart; } else if (addBlockPadding) { // if we cross the block boundary, figure out what we should do long padding = blockSize - previousBytesInBlock; if (padding <= maxPadding) { - writeZeros(out, padding); + writeZeros(rawWriter, padding); stripeStart += padding; } } @@ -303,62 +290,225 @@ public class PhysicalWriterImpl implements PhysicalWriter { } } + private static class DirectStream implements OutputReceiver { + private final FSDataOutputStream output; + + DirectStream(FSDataOutputStream output) { + this.output = output; + } + + @Override + public void output(ByteBuffer buffer) throws IOException { + output.write( + buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + } + + @Override + public void suppress() { + throw new UnsupportedOperationException("Can't suppress direct stream"); + } + } + private void writeStripeFooter( OrcProto.StripeFooter footer, - long dataSize, - long indexSize, + SizeCounters sizes, OrcProto.StripeInformation.Builder dirEntry) throws IOException { writeStripeFooter(footer); - dirEntry.setOffset(stripeStart); - dirEntry.setFooterLength(out.getPos() - stripeStart - dataSize - indexSize); + dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - sizes.total()); } protected void writeMetadata(OrcProto.Metadata metadata) throws IOException { - metadata.writeTo(protobufWriter); - protobufWriter.flush(); + metadata.writeTo(codedCompressStream); + codedCompressStream.flush(); writer.flush(); } protected void writeFileFooter(OrcProto.Footer footer) throws IOException { - footer.writeTo(protobufWriter); - protobufWriter.flush(); + footer.writeTo(codedCompressStream); + codedCompressStream.flush(); writer.flush(); } protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException { - footer.writeTo(protobufWriter); - protobufWriter.flush(); + footer.writeTo(codedCompressStream); + codedCompressStream.flush(); writer.flush(); } - private static void writeZeros(OutputStream output, long remaining) throws IOException { - while (remaining > 0) { - long size = Math.min(ZEROS.length, remaining); - output.write(ZEROS, 0, (int) size); - remaining -= size; + static void writeEncryptedStripeStatistics( + DirectStream output, int stripeNumber, VariantTracker tracker) throws IOException { + StreamOptions options = new StreamOptions(tracker.options); + tracker.stripeStatsStreams.clear(); + for (int col = tracker.rootColumn; + col < tracker.rootColumn + tracker.stripeStats.length; + ++col) { + options.modifyIv( + CryptoUtils.modifyIvForStream( + col, OrcProto.Stream.Kind.STRIPE_STATISTICS, stripeNumber + 1)); + OutStream stream = new OutStream("stripe stats for " + col, options, output); + OrcProto.ColumnarStripeStatistics stats = + OrcProto.ColumnarStripeStatistics.newBuilder() + .addAllColStats(tracker.stripeStats[col - tracker.rootColumn]) + .build(); + long start = output.output.getPos(); + stats.writeTo(stream); + stream.flush(); + OrcProto.Stream description = + OrcProto.Stream.newBuilder() + .setColumn(col) + .setKind(OrcProto.Stream.Kind.STRIPE_STATISTICS) + .setLength(output.output.getPos() - start) + .build(); + tracker.stripeStatsStreams.add(description); } } - private static class DirectStream implements OutputReceiver { - private final FSDataOutputStream output; + static void setUnencryptedStripeStatistics( + OrcProto.Metadata.Builder builder, + int stripeCount, + List<OrcProto.ColumnStatistics>[] stats) { + // Make the unencrypted stripe stats into lists of StripeStatistics. + builder.clearStripeStats(); + for (int s = 0; s < stripeCount; ++s) { + OrcProto.StripeStatistics.Builder stripeStats = OrcProto.StripeStatistics.newBuilder(); + for (List<OrcProto.ColumnStatistics> col : stats) { + stripeStats.addColStats(col.get(s)); + } + builder.addStripeStats(stripeStats.build()); + } + } - DirectStream(FSDataOutputStream output) { - this.output = output; + static void setEncryptionStatistics( + OrcProto.Encryption.Builder encryption, + int stripeNumber, + Collection<VariantTracker> variants) + throws IOException { + int v = 0; + for (VariantTracker variant : variants) { + OrcProto.EncryptionVariant.Builder variantBuilder = encryption.getVariantsBuilder(v++); + + // Add the stripe statistics streams to the variant description. + variantBuilder.clearStripeStatistics(); + variantBuilder.addAllStripeStatistics(variant.stripeStatsStreams); + + // Serialize and encrypt the file statistics. + OrcProto.FileStatistics.Builder file = OrcProto.FileStatistics.newBuilder(); + for (OrcProto.ColumnStatistics col : variant.fileStats) { + file.addColumn(col); + } + StreamOptions options = new StreamOptions(variant.options); + options.modifyIv( + CryptoUtils.modifyIvForStream( + variant.rootColumn, + OrcProto.Stream.Kind.FILE_STATISTICS, + stripeNumber + 1)); + BufferedStream buffer = new BufferedStream(); + OutStream stream = new OutStream("stats for " + variant, options, buffer); + file.build().writeTo(stream); + stream.flush(); + variantBuilder.setFileStatistics(buffer.getBytes()); } + } - public void output(ByteBuffer buffer) throws IOException { - this.output.write( - buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); + @Override + public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { + long stripeStatisticsStart = rawWriter.getPos(); + for (VariantTracker variant : variants.values()) { + writeEncryptedStripeStatistics(rawStream, stripeNumber, variant); } + setUnencryptedStripeStatistics(builder, stripeNumber, unencrypted.stripeStats); + long metadataStart = rawWriter.getPos(); + writeMetadata(builder.build()); + this.stripeStatisticsLength = (int) (metadataStart - stripeStatisticsStart); + this.metadataLength = (int) (rawWriter.getPos() - metadataStart); + } - public void suppress() { - throw new UnsupportedOperationException("Can't suppress direct stream"); + static void addUnencryptedStatistics( + OrcProto.Footer.Builder builder, OrcProto.ColumnStatistics[] stats) { + for (OrcProto.ColumnStatistics stat : stats) { + builder.addStatistics(stat); + } + } + + @Override + public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { + if (!variants.isEmpty()) { + OrcProto.Encryption.Builder encryption = builder.getEncryptionBuilder(); + setEncryptionStatistics(encryption, stripeNumber, variants.values()); } + addUnencryptedStatistics(builder, unencrypted.fileStats); + long bodyLength = rawWriter.getPos() - metadataLength - stripeStatisticsLength; + builder.setContentLength(bodyLength); + builder.setHeaderLength(headerLength); + long startPosn = rawWriter.getPos(); + OrcProto.Footer footer = builder.build(); + writeFileFooter(footer); + this.footerLength = (int) (rawWriter.getPos() - startPosn); } - private static final class BufferedStream implements OutputReceiver { + @Override + public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { + builder.setFooterLength(footerLength); + builder.setMetadataLength(metadataLength); + if (!variants.isEmpty()) { + builder.setStripeStatisticsLength(stripeStatisticsLength); + } + OrcProto.PostScript ps = builder.build(); + // need to write this uncompressed + long startPosn = rawWriter.getPos(); + ps.writeTo(rawWriter); + long length = rawWriter.getPos() - startPosn; + if (length > 255) { + throw new IllegalArgumentException("PostScript too large at " + length); + } + rawWriter.write((int) length); + return rawWriter.getPos(); + } + + @Override + public void close() throws IOException { + // Just release the codec but don't close the internal stream here to avoid + // Stream Closed or ClosedChannelException when Flink performs checkpoint. + + CompressionCodec codec = compress.getCodec(); + if (codec != null) { + OrcCodecPool.returnCodec(codec.getKind(), codec); + } + compress.withCodec(null, null); + } + + @Override + public void flush() throws IOException { + rawWriter.flush(); + } + + @Override + public void appendRawStripe(ByteBuffer buffer, OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + long start = rawWriter.getPos(); + int length = buffer.remaining(); + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (length < blockSize && length > availBlockSpace && addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.debug("Padding ORC by {} bytes while merging", availBlockSpace); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + rawWriter.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), length); + dirEntry.setOffset(start); + stripeNumber += 1; + } + + static final class BufferedStream implements OutputReceiver { private boolean isSuppressed = false; private final List<ByteBuffer> output = new ArrayList<>(); @@ -369,12 +519,18 @@ public class PhysicalWriterImpl implements PhysicalWriter { } } + @Override public void suppress() { isSuppressed = true; output.clear(); } - void spillToDiskAndClear(FSDataOutputStream raw) throws IOException { + /** + * Write any saved buffers to the OutputStream if needed, and clears all the buffers. + * + * @return true if the stream was written + */ + boolean spillToDiskAndClear(FSDataOutputStream raw) throws IOException { if (!isSuppressed) { for (ByteBuffer buffer : output) { raw.write( @@ -383,10 +539,38 @@ public class PhysicalWriterImpl implements PhysicalWriter { buffer.remaining()); } output.clear(); + return true; } isSuppressed = false; + return false; + } + + /** + * Get the buffer as a protobuf ByteString and clears the BufferedStream. + * + * @return the bytes + */ + ByteString getBytes() { + int len = output.size(); + if (len == 0) { + return ByteString.EMPTY; + } else { + ByteString result = ByteString.copyFrom(output.get(0)); + for (int i = 1; i < output.size(); ++i) { + result = result.concat(ByteString.copyFrom(output.get(i))); + } + output.clear(); + return result; + } } + /** + * Get the number of bytes that will be written to the output. + * + * <p>Assumes the stream writing into this receiver has already been flushed. + * + * @return number of bytes + */ public long getOutputSize() { long result = 0; for (ByteBuffer buffer : output) { @@ -395,4 +579,141 @@ public class PhysicalWriterImpl implements PhysicalWriter { return result; } } + + static class SizeCounters { + long index = 0; + long data = 0; + + long total() { + return index + data; + } + } + + void buildStreamList(OrcProto.StripeFooter.Builder footerBuilder, SizeCounters sizes) + throws IOException { + footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.INDEX, sizes)); + final long unencryptedIndexSize = sizes.index; + int v = 0; + for (VariantTracker variant : variants.values()) { + OrcProto.StripeEncryptionVariant.Builder builder = + footerBuilder.getEncryptionBuilder(v++); + builder.addAllStreams(variant.placeStreams(StreamName.Area.INDEX, sizes)); + } + if (sizes.index != unencryptedIndexSize) { + // add a placeholder that covers the hole where the encrypted indexes are + footerBuilder.addStreams( + OrcProto.Stream.newBuilder() + .setKind(OrcProto.Stream.Kind.ENCRYPTED_INDEX) + .setLength(sizes.index - unencryptedIndexSize)); + } + footerBuilder.addAllStreams(unencrypted.placeStreams(StreamName.Area.DATA, sizes)); + final long unencryptedDataSize = sizes.data; + v = 0; + for (VariantTracker variant : variants.values()) { + OrcProto.StripeEncryptionVariant.Builder builder = + footerBuilder.getEncryptionBuilder(v++); + builder.addAllStreams(variant.placeStreams(StreamName.Area.DATA, sizes)); + } + if (sizes.data != unencryptedDataSize) { + // add a placeholder that covers the hole where the encrypted indexes are + footerBuilder.addStreams( + OrcProto.Stream.newBuilder() + .setKind(OrcProto.Stream.Kind.ENCRYPTED_DATA) + .setLength(sizes.data - unencryptedDataSize)); + } + } + + @Override + public void finalizeStripe( + OrcProto.StripeFooter.Builder footerBuilder, + OrcProto.StripeInformation.Builder dirEntry) + throws IOException { + SizeCounters sizes = new SizeCounters(); + buildStreamList(footerBuilder, sizes); + + OrcProto.StripeFooter footer = footerBuilder.build(); + + // Do we need to pad the file so the stripe doesn't straddle a block boundary? + padStripe(sizes.total() + footer.getSerializedSize()); + + // write the unencrypted index streams + unencrypted.writeStreams(StreamName.Area.INDEX, rawWriter); + // write the encrypted index streams + for (VariantTracker variant : variants.values()) { + variant.writeStreams(StreamName.Area.INDEX, rawWriter); + } + + // write the unencrypted data streams + unencrypted.writeStreams(StreamName.Area.DATA, rawWriter); + // write out the unencrypted data streams + for (VariantTracker variant : variants.values()) { + variant.writeStreams(StreamName.Area.DATA, rawWriter); + } + + // Write out the footer. + writeStripeFooter(footer, sizes, dirEntry); + + // fill in the data sizes + dirEntry.setDataLength(sizes.data); + dirEntry.setIndexLength(sizes.index); + + stripeNumber += 1; + } + + @Override + public void writeHeader() throws IOException { + rawWriter.write(OrcFile.MAGIC.getBytes()); + headerLength = rawWriter.getPos(); + } + + @Override + public BufferedStream createDataStream(StreamName name) { + VariantTracker variant = getVariant(name.getEncryption()); + BufferedStream result = variant.streams.get(name); + if (result == null) { + result = new BufferedStream(); + variant.streams.put(name, result); + } + return result; + } + + protected OutputStream createIndexStream(StreamName name) { + BufferedStream buffer = createDataStream(name); + VariantTracker tracker = getVariant(name.getEncryption()); + StreamOptions options = + SerializationUtils.getCustomizedCodec( + tracker.options, compressionStrategy, name.getKind()); + if (options.isEncrypted()) { + if (options == tracker.options) { + options = new StreamOptions(options); + } + options.modifyIv(CryptoUtils.modifyIvForStream(name, stripeNumber + 1)); + } + return new OutStream(name.toString(), options, buffer); + } + + @Override + public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index) throws IOException { + OutputStream stream = createIndexStream(name); + index.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom) + throws IOException { + OutputStream stream = createIndexStream(name); + bloom.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeStatistics(StreamName name, OrcProto.ColumnStatistics.Builder statistics) { + VariantTracker tracker = getVariant(name.getEncryption()); + if (name.getKind() == OrcProto.Stream.Kind.FILE_STATISTICS) { + tracker.fileStats[name.getColumn() - tracker.rootColumn] = statistics.build(); + } else { + tracker.stripeStats[name.getColumn() - tracker.rootColumn].add(statistics.build()); + } + } } diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java index 2b1458c81792..04d6bd74ab51 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowInputFormatTest.java @@ -204,20 +204,26 @@ class OrcColumnarRowInputFormatTest { RowType tableType = RowType.of( - /* 0 */ DataTypes.INT().getLogicalType(), - /* 1 */ DataTypes.INT().getLogicalType(), // part-1 - /* 2 */ DataTypes.STRING().getLogicalType(), - /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2 - /* 4 */ DataTypes.STRING().getLogicalType(), - /* 5 */ DataTypes.STRING().getLogicalType(), // part-3 - /* 6 */ DataTypes.STRING().getLogicalType(), - /* 7 */ DataTypes.INT().getLogicalType(), - /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4 - /* 9 */ DataTypes.STRING().getLogicalType(), - /* 11*/ DataTypes.INT().getLogicalType(), - /* 12*/ DataTypes.INT().getLogicalType(), - /* 13*/ DataTypes.STRING().getLogicalType(), // part-5 - /* 14*/ DataTypes.INT().getLogicalType()); + new LogicalType[] { + /* 0 */ DataTypes.INT().getLogicalType(), + /* 1 */ DataTypes.INT().getLogicalType(), // part-1 + /* 2 */ DataTypes.STRING().getLogicalType(), + /* 3 */ DataTypes.BIGINT().getLogicalType(), // part-2 + /* 4 */ DataTypes.STRING().getLogicalType(), + /* 5 */ DataTypes.STRING().getLogicalType(), // part-3 + /* 6 */ DataTypes.STRING().getLogicalType(), + /* 7 */ DataTypes.INT().getLogicalType(), + /* 8 */ DataTypes.DECIMAL(10, 5).getLogicalType(), // part-4 + /* 9 */ DataTypes.STRING().getLogicalType(), + /* 10*/ DataTypes.INT().getLogicalType(), + /* 11*/ DataTypes.INT().getLogicalType(), + /* 12*/ DataTypes.STRING().getLogicalType(), // part-5 + /* 13*/ DataTypes.INT().getLogicalType() + }, + new String[] { + "_col0", "f1", "_col1", "f3", "_col2", "f5", "_col3", "_col4", "f8", + "_col5", "_col6", "_col7", "f13", "_col8" + }); int[] projectedFields = {8, 1, 3, 0, 5, 2}; diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java index bf0418c657f8..a7fd08c343d2 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java @@ -73,6 +73,11 @@ public class OrcColumnarRowSplitReaderTest { DataTypes.INT() }; + private final String[] testSchemaNameFlat = + new String[] { + "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6", "_col7", "_col8" + }; + private final DataType[] testSchemaDecimal = new DataType[] {DataTypes.DECIMAL(10, 5)}; private static Path testFileFlat; @@ -97,7 +102,12 @@ public class OrcColumnarRowSplitReaderTest { for (FileInputSplit split : splits) { try (OrcColumnarRowSplitReader reader = - createReader(new int[] {0, 1}, testSchemaFlat, new HashMap<>(), split)) { + createReader( + new int[] {0, 1}, + testSchemaFlat, + testSchemaNameFlat, + new HashMap<>(), + split)) { // read and count all rows while (!reader.reachedEnd()) { RowData row = reader.nextRecord(null); @@ -119,7 +129,12 @@ public class OrcColumnarRowSplitReaderTest { FileInputSplit[] splits = createSplits(testFileDecimal, 1); try (OrcColumnarRowSplitReader reader = - createReader(new int[] {0}, testSchemaDecimal, new HashMap<>(), splits[0])) { + createReader( + new int[] {0}, + testSchemaDecimal, + new String[] {"_col0"}, + new HashMap<>(), + splits[0])) { assertThat(reader.reachedEnd()).isFalse(); RowData row = reader.nextRecord(null); @@ -176,10 +191,14 @@ public class OrcColumnarRowSplitReaderTest { /* 7 */ DataTypes.INT(), /* 8 */ DataTypes.DECIMAL(10, 5), // part-4 /* 9 */ DataTypes.STRING(), + /* 10*/ DataTypes.INT(), /* 11*/ DataTypes.INT(), - /* 12*/ DataTypes.INT(), - /* 13*/ DataTypes.STRING(), // part-5 - /* 14*/ DataTypes.INT() + /* 12*/ DataTypes.STRING(), // part-5 + /* 13*/ DataTypes.INT() + }, + new String[] { + "_col0", "f1", "_col1", "f3", "_col2", "f5", "_col3", "_col4", "f8", + "_col5", "_col6", "_col7", "f13", "_col8" }, partSpec, split)) { @@ -222,7 +241,12 @@ public class OrcColumnarRowSplitReaderTest { for (FileInputSplit split : splits) { try (OrcColumnarRowSplitReader reader = - createReader(new int[] {2, 0, 1}, testSchemaFlat, new HashMap<>(), split)) { + createReader( + new int[] {2, 0, 1}, + testSchemaFlat, + testSchemaNameFlat, + new HashMap<>(), + split)) { // read and count all rows while (!reader.reachedEnd()) { RowData row = reader.nextRecord(null); @@ -403,10 +427,25 @@ public class OrcColumnarRowSplitReaderTest { Map<String, Object> partitionSpec, FileInputSplit split) throws IOException { + return createReader( + selectedFields, + fullTypes, + IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), + partitionSpec, + split); + } + + protected OrcColumnarRowSplitReader createReader( + int[] selectedFields, + DataType[] fullTypes, + String[] fullNames, + Map<String, Object> partitionSpec, + FileInputSplit split) + throws IOException { return OrcSplitReaderUtil.genPartColumnarRowReader( "2.3.0", new Configuration(), - IntStream.range(0, fullTypes.length).mapToObj(i -> "f" + i).toArray(String[]::new), + fullNames, fullTypes, partitionSpec, selectedFields, diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java index f2d2bdaf65ad..17e912c2c8e9 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcFormatStatisticsReportTest.java @@ -206,10 +206,10 @@ public class OrcFormatStatisticsReportTest extends StatisticsReportTestBase { "f_timestamp9", new ColumnStats.Builder() .setMax( - DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123", 3) + DateTimeUtils.parseTimestampData("1990-10-16 12:12:43.123456789", 9) .toTimestamp()) .setMin( - DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123", 3) + DateTimeUtils.parseTimestampData("1990-10-14 12:12:43.123456789", 9) .toTimestamp()) .setNullCount(0L) .build()); diff --git a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE index 8554030115d6..872bd7125355 100644 --- a/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE +++ b/flink-formats/flink-sql-orc/src/main/resources/META-INF/NOTICE @@ -6,13 +6,13 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.orc:orc-core:1.5.6 -- org.apache.orc:orc-shims:1.5.6 -- org.apache.hive:hive-storage-api:2.6.0 -- io.airlift:aircompressor:0.10 +- org.apache.orc:orc-core:1.9.4 +- org.apache.orc:orc-shims:1.9.4 +- org.apache.hive:hive-storage-api:2.8.1 +- io.airlift:aircompressor:0.27 - commons-lang:commons-lang:2.6 This project bundles the following dependencies under the BSD license. See bundled license files for details. -- com.google.protobuf:protobuf-java:2.5.0 +- com.google.protobuf:protobuf-java:3.21.7 diff --git a/pom.xml b/pom.xml index 165a3dc038a1..8ade0d9b7a1f 100644 --- a/pom.xml +++ b/pom.xml @@ -170,7 +170,8 @@ under the License. --> <minikdc.version>3.2.4</minikdc.version> <hive.version>2.3.10</hive.version> - <orc.version>1.5.6</orc.version> + <orc.version>1.9.4</orc.version> + <storage-api.version>2.8.1</storage-api.version> <japicmp.referenceVersion>1.20.0</japicmp.referenceVersion> <japicmp.outputDir>tools/japicmp-output</japicmp.outputDir> <checkstyle.version>10.18.2</checkstyle.version>