This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new a85b4c885 ORC-1198: Add a new `PhysicalFsWriter` constructor with 
`FSDataOutputStream` parameter
a85b4c885 is described below

commit a85b4c8852a894a701ddb73c15fb84ed1035abb9
Author: Jia Liu <[email protected]>
AuthorDate: Fri Jun 10 12:33:40 2022 -0700

    ORC-1198: Add a new `PhysicalFsWriter` constructor with 
`FSDataOutputStream` parameter
    
    ### What changes were proposed in this pull request?
    Add stream parameter constructor for PhysicalFsWriter.
    
    ### Why are the changes needed?
    PhysicalFsWriter implementation works on the basis of a Path, but Flink's 
bulk writer based on stream.
    In order to integrate with flink more elegantly,  I think a constructor 
with stream parameter should be added to PhysicalFsWriter
    
    ### How was this patch tested?
    It won't change behavior of PhysicalFsWriter and passed all existed test 
cases.
    
    Closes #1153 from liujiawinds/main.
    
    Authored-by: Jia Liu <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 0565bfe36babedeed1ea65ad62927de7ff9e3ed7)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../java/org/apache/orc/impl/PhysicalFsWriter.java | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java 
b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
index 47744dce2..ad6c19878 100644
--- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
+++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java
@@ -57,7 +57,7 @@ public class PhysicalFsWriter implements PhysicalWriter {
   // a protobuf outStream around streamFactory
   private CodedOutputStream codedCompressStream;
 
-  private final Path path;
+  private Path path;
   private final HadoopShims shims;
   private final long blockSize;
   private final int maxPadding;
@@ -91,7 +91,18 @@ public class PhysicalFsWriter implements PhysicalWriter {
                           OrcFile.WriterOptions opts,
                           WriterEncryptionVariant[] encryption
                           ) throws IOException {
+    this(fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE,
+            fs.getDefaultReplication(path), opts.getBlockSize()), opts, 
encryption);
     this.path = path;
+    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: 
{}" +
+            " compression: {}", path, opts.getStripeSize(), blockSize, 
compress);
+  }
+
+  public PhysicalFsWriter(FSDataOutputStream outputStream,
+                          OrcFile.WriterOptions opts,
+                          WriterEncryptionVariant[] encryption
+                          ) throws IOException {
+    this.rawWriter = outputStream;
     long defaultStripeSize = opts.getStripeSize();
     this.addBlockPadding = opts.getBlockPadding();
     if (opts.isEnforceBufferSize()) {
@@ -109,10 +120,6 @@ public class PhysicalFsWriter implements PhysicalWriter {
     this.compressionStrategy = opts.getCompressionStrategy();
     this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize);
     this.blockSize = opts.getBlockSize();
-    LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: 
{}" +
-        " compression: {}", path, defaultStripeSize, blockSize, compress);
-    rawWriter = fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE,
-        fs.getDefaultReplication(path), blockSize);
     blockOffset = 0;
     unencrypted = new VariantTracker(opts.getSchema(), compress);
     writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks();
@@ -763,6 +770,10 @@ public class PhysicalFsWriter implements PhysicalWriter {
 
   @Override
   public String toString() {
-    return path.toString();
+    if (path != null) {
+      return path.toString();
+    } else {
+      return ByteString.EMPTY.toString();
+    }
   }
 }

Reply via email to