This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 49824f58b6 [ASTERIXDB-3453][STO] Incompressible pages are not written 
as full pages
49824f58b6 is described below

commit 49824f58b68dd6621ba41754e8e07d1cab356c70
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Sun Jul 7 16:42:16 2024 -0700

    [ASTERIXDB-3453][STO] Incompressible pages are not written as full pages
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Incompressible pages must be written as full pages (i.e., as
    page + header) entirely to ensure the position of the cloud
    files are in-sync with local files.
    
    Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../asterix/cloud/AbstractCloudIOManager.java      | 38 ++++++++++++++++++----
 .../asterix/cloud/CloudResettableInputStream.java  | 14 ++++++--
 .../apache/asterix/cloud/clients/ICloudWriter.java |  5 +++
 .../cloud/clients/google/gcs/GCSWriter.java        | 14 +++++++-
 .../context/DefaultCloudOnlyWriteContext.java      |  4 +--
 .../apache/hyracks/cloud/io/ICloudIOManager.java   |  6 ++--
 .../common/file/CompressedBufferedFileHandle.java  |  6 ++--
 7 files changed, 71 insertions(+), 16 deletions(-)

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e87b6..033f13511b 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -210,10 +210,11 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     }
 
     @Override
-    public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws 
HyracksDataException {
+    public final int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException {
         ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
         int writtenBytes;
         try {
+            ensurePosition(fHandle, cloudWriter.position(), offset);
             writtenBytes = cloudWriter.write(data);
         } catch (HyracksDataException e) {
             cloudWriter.abort();
@@ -223,10 +224,11 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     }
 
     @Override
-    public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) 
throws HyracksDataException {
+    public final long cloudWrite(IFileHandle fHandle, long offset, 
ByteBuffer[] data) throws HyracksDataException {
         ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
         int writtenBytes;
         try {
+            ensurePosition(fHandle, cloudWriter.position(), offset);
             writtenBytes = cloudWriter.write(data[0], data[1]);
         } catch (HyracksDataException e) {
             cloudWriter.abort();
@@ -265,18 +267,33 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     @Override
     public final long doSyncWrite(IFileHandle fHandle, long offset, 
ByteBuffer[] dataArray)
             throws HyracksDataException {
+        // Save original position and limit
+        ByteBuffer buffer1 = dataArray[0];
+        int position1 = buffer1.position();
+
+        ByteBuffer buffer2 = dataArray[1];
+        int position2 = buffer2.position();
+
         long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, 
dataArray);
-        dataArray[0].flip();
-        dataArray[1].flip();
-        cloudWrite(fHandle, dataArray);
+
+        // Restore original position
+        buffer1.position(position1);
+        buffer2.position(position2);
+
+        cloudWrite(fHandle, offset, dataArray);
         return writtenBytes;
     }
 
     @Override
     public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException {
+        // Save original position and limit
+        int position = data.position();
+
         int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
-        data.flip();
-        cloudWrite(fHandle, data);
+
+        // Restore original position
+        data.position(position);
+        cloudWrite(fHandle, offset, data);
         return writtenBytes;
     }
 
@@ -390,4 +407,11 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
             performBulkOperation(deleteBulkOperation);
         }
     }
+
+    private void ensurePosition(IFileHandle fileHandle, long cloudOffset, long 
requestedWriteOffset) {
+        if (cloudOffset != requestedWriteOffset) {
+            throw new IllegalStateException("Misaligned positions in " + 
fileHandle.getFileReference()
+                    + ", cloudOffset: " + cloudOffset + " != 
requestedWriteOffset: " + requestedWriteOffset);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898928..a233ca5afb 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
     private final IWriteBufferProvider bufferProvider;
     private final ICloudBufferedWriter bufferedWriter;
     private ByteBuffer writeBuffer;
+    private long writtenBytes;
 
     public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, 
IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
         this.bufferProvider = bufferProvider;
+        writtenBytes = 0;
     }
 
     /* ************************************************************
@@ -75,7 +77,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
     @Override
     public int write(ByteBuffer page) throws HyracksDataException {
         open();
-        return write(page.array(), 0, page.limit());
+        return write(page.array(), page.position(), page.remaining());
     }
 
     @Override
@@ -84,6 +86,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
             uploadAndWait();
         }
         writeBuffer.put((byte) b);
+        writtenBytes += 1;
     }
 
     @Override
@@ -102,7 +105,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
             // enough to write all
             if (writeBuffer.remaining() > pageRemaining) {
                 writeBuffer.put(b, offset, pageRemaining);
-                return len;
+                break;
             }
 
             int remaining = writeBuffer.remaining();
@@ -112,9 +115,15 @@ public class CloudResettableInputStream extends 
InputStream implements ICloudWri
             uploadAndWait();
         }
 
+        writtenBytes += len;
         return len;
     }
 
+    @Override
+    public long position() {
+        return writtenBytes;
+    }
+
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
         if (writeBuffer.remaining() == 0) {
@@ -173,6 +182,7 @@ public class CloudResettableInputStream extends InputStream 
implements ICloudWri
         if (writeBuffer == null) {
             writeBuffer = bufferProvider.getBuffer();
             writeBuffer.clear();
+            writtenBytes = 0;
         }
     }
 
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4d9c..920be9ce2e 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -60,6 +60,11 @@ public interface ICloudWriter {
      */
     int write(byte[] b, int off, int len) throws HyracksDataException;
 
+    /**
+     * @return the current position of the writer
+     */
+    long position();
+
     /**
      * Finish the write operation
      * Note: this should be called upon successful write
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5507..d9119a58f7 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@ public class GCSWriter implements ICloudWriter {
     private final IRequestProfiler profiler;
     private final Storage gcsClient;
     private WriteChannel writer = null;
+    private long writtenBytes;
 
     public GCSWriter(String bucket, String path, Storage gcsClient, 
IRequestProfiler profiler) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
         this.gcsClient = gcsClient;
+        writtenBytes = 0;
     }
 
     @Override
@@ -67,17 +69,26 @@ public class GCSWriter implements ICloudWriter {
             throw HyracksDataException.create(e);
         }
 
+        writtenBytes += written;
         return written;
     }
 
     @Override
     public int write(byte[] b, int off, int len) throws HyracksDataException {
-        return write(ByteBuffer.wrap(b, off, len));
+        int written = write(ByteBuffer.wrap(b, off, len));
+        writtenBytes += written;
+        return written;
+    }
+
+    @Override
+    public long position() {
+        return writtenBytes;
     }
 
     @Override
     public void write(int b) throws HyracksDataException {
         write(ByteBuffer.wrap(new byte[] { (byte) b }));
+        writtenBytes += 1;
     }
 
     @Override
@@ -105,6 +116,7 @@ public class GCSWriter implements ICloudWriter {
         if (writer == null) {
             writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, 
path)).build());
             writer.setChunkSize(WRITE_BUFFER_SIZE);
+            writtenBytes = 0;
             log("STARTED");
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index c75e83a6e0..69d33e83bd 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@ public final class DefaultCloudOnlyWriteContext implements 
IBufferCacheWriteCont
     public int write(IOManager ioManager, IFileHandle handle, long offset, 
ByteBuffer data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, data);
+        return cloudIOManager.cloudWrite(handle, offset, data);
     }
 
     @Override
     public long write(IOManager ioManager, IFileHandle handle, long offset, 
ByteBuffer[] data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, data);
+        return cloudIOManager.cloudWrite(handle, offset, data);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index ab57139195..9b6a2ff96f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -74,19 +74,21 @@ public interface ICloudIOManager {
      * Write to cloud only
      *
      * @param fHandle file handle
+     * @param offset  position to write from
      * @param data    to write
      * @return number of written bytes
      */
-    int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws 
HyracksDataException;
+    int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws 
HyracksDataException;
 
     /**
      * Write to cloud only
      *
      * @param fHandle file handle
+     * @param offset  position to write from
      * @param data    to write
      * @return number of written bytes
      */
-    long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws 
HyracksDataException;
+    long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) 
throws HyracksDataException;
 
     /**
      * Punch a hole in a file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27e79..6bc85ffeb4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@ public class CompressedBufferedFileHandle extends 
BufferedFileHandle {
                 expectedBytesWritten = cBuffer.limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
cBuffer);
             } else {
-                //Compression did not gain any savings
+                // Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
+                // Incompressible pages should be written entirely
+                fixBufferPointers(buffers[1], 0);
                 offset = compressedFileManager.writePageInfo(pageId, 
bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) 
buffers[1].limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
buffers);
@@ -152,7 +154,7 @@ public class CompressedBufferedFileHandle extends 
BufferedFileHandle {
         long bytesWritten = 0;
         for (int i = 1; i < totalPages; i++) {
             fixBufferPointers(uBuffer, i);
-            cBuffer.position(0);
+            cBuffer.clear();
 
             final ByteBuffer writeBuffer;
             if (compressToWriteBuffer(uBuffer, cBuffer) < 
bufferCache.getPageSize()) {

Reply via email to