This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new a85b4c885 ORC-1198: Add a new `PhysicalFsWriter` constructor with
`FSDataOutputStream` parameter
a85b4c885 is described below
commit a85b4c8852a894a701ddb73c15fb84ed1035abb9
Author: Jia Liu <[email protected]>
AuthorDate: Fri Jun 10 12:33:40 2022 -0700
ORC-1198: Add a new `PhysicalFsWriter` constructor with
`FSDataOutputStream` parameter
### What changes were proposed in this pull request?
Add stream parameter constructor for PhysicalFsWriter.
### Why are the changes needed?
PhysicalFsWriter implementation works on the basis of a Path, but Flink's
bulk writer based on stream.
In order to integrate with flink more elegantly, I think a constructor
with stream parameter should be added to PhysicalFsWriter
### How was this patch tested?
It won't change behavior of PhysicalFsWriter and passed all existed test
cases.
Closes #1153 from liujiawinds/main.
Authored-by: Jia Liu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 0565bfe36babedeed1ea65ad62927de7ff9e3ed7)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../java/org/apache/orc/impl/PhysicalFsWriter.java | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 47744dce2..ad6c19878 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -57,7 +57,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
// a protobuf outStream around streamFactory
private CodedOutputStream codedCompressStream;
- private final Path path;
+ private Path path;
private final HadoopShims shims;
private final long blockSize;
private final int maxPadding;
@@ -91,7 +91,18 @@ public class PhysicalFsWriter implements PhysicalWriter {
OrcFile.WriterOptions opts,
WriterEncryptionVariant[] encryption
) throws IOException {
+ this(fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE,
+ fs.getDefaultReplication(path), opts.getBlockSize()), opts,
encryption);
this.path = path;
+ LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize:
{}" +
+ " compression: {}", path, opts.getStripeSize(), blockSize,
compress);
+ }
+
+ public PhysicalFsWriter(FSDataOutputStream outputStream,
+ OrcFile.WriterOptions opts,
+ WriterEncryptionVariant[] encryption
+ ) throws IOException {
+ this.rawWriter = outputStream;
long defaultStripeSize = opts.getStripeSize();
this.addBlockPadding = opts.getBlockPadding();
if (opts.isEnforceBufferSize()) {
@@ -109,10 +120,6 @@ public class PhysicalFsWriter implements PhysicalWriter {
this.compressionStrategy = opts.getCompressionStrategy();
this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
this.blockSize = opts.getBlockSize();
- LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize:
{}" +
- " compression: {}", path, defaultStripeSize, blockSize, compress);
- rawWriter = fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE,
- fs.getDefaultReplication(path), blockSize);
blockOffset = 0;
unencrypted = new VariantTracker(opts.getSchema(), compress);
writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
@@ -763,6 +770,10 @@ public class PhysicalFsWriter implements PhysicalWriter {
@Override
public String toString() {
- return path.toString();
+ if (path != null) {
+ return path.toString();
+ } else {
+ return ByteString.EMPTY.toString();
+ }
}
}