This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 49824f58b6 [ASTERIXDB-3453][STO] Incompressible pages are not written
as full pages
49824f58b6 is described below
commit 49824f58b68dd6621ba41754e8e07d1cab356c70
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Sun Jul 7 16:42:16 2024 -0700
[ASTERIXDB-3453][STO] Incompressible pages are not written as full pages
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Incompressible pages must be written as full pages (i.e., as
page + header) entirely to ensure the position of the cloud
files are in-sync with local files.
Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../asterix/cloud/AbstractCloudIOManager.java | 38 ++++++++++++++++++----
.../asterix/cloud/CloudResettableInputStream.java | 14 ++++++--
.../apache/asterix/cloud/clients/ICloudWriter.java | 5 +++
.../cloud/clients/google/gcs/GCSWriter.java | 14 +++++++-
.../context/DefaultCloudOnlyWriteContext.java | 4 +--
.../apache/hyracks/cloud/io/ICloudIOManager.java | 6 ++--
.../common/file/CompressedBufferedFileHandle.java | 6 ++--
7 files changed, 71 insertions(+), 16 deletions(-)
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e87b6..033f13511b 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -210,10 +210,11 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
}
@Override
- public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws
HyracksDataException {
+ public final int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer
data) throws HyracksDataException {
ICloudWriter cloudWriter = ((CloudFileHandle)
fHandle).getCloudWriter();
int writtenBytes;
try {
+ ensurePosition(fHandle, cloudWriter.position(), offset);
writtenBytes = cloudWriter.write(data);
} catch (HyracksDataException e) {
cloudWriter.abort();
@@ -223,10 +224,11 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
}
@Override
- public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data)
throws HyracksDataException {
+ public final long cloudWrite(IFileHandle fHandle, long offset,
ByteBuffer[] data) throws HyracksDataException {
ICloudWriter cloudWriter = ((CloudFileHandle)
fHandle).getCloudWriter();
int writtenBytes;
try {
+ ensurePosition(fHandle, cloudWriter.position(), offset);
writtenBytes = cloudWriter.write(data[0], data[1]);
} catch (HyracksDataException e) {
cloudWriter.abort();
@@ -265,18 +267,33 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
@Override
public final long doSyncWrite(IFileHandle fHandle, long offset,
ByteBuffer[] dataArray)
throws HyracksDataException {
+ // Save original position and limit
+ ByteBuffer buffer1 = dataArray[0];
+ int position1 = buffer1.position();
+
+ ByteBuffer buffer2 = dataArray[1];
+ int position2 = buffer2.position();
+
long writtenBytes = localIoManager.doSyncWrite(fHandle, offset,
dataArray);
- dataArray[0].flip();
- dataArray[1].flip();
- cloudWrite(fHandle, dataArray);
+
+ // Restore original position
+ buffer1.position(position1);
+ buffer2.position(position2);
+
+ cloudWrite(fHandle, offset, dataArray);
return writtenBytes;
}
@Override
public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer
data) throws HyracksDataException {
+ // Save original position and limit
+ int position = data.position();
+
int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
- data.flip();
- cloudWrite(fHandle, data);
+
+ // Restore original position
+ data.position(position);
+ cloudWrite(fHandle, offset, data);
return writtenBytes;
}
@@ -390,4 +407,11 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
performBulkOperation(deleteBulkOperation);
}
}
+
+ private void ensurePosition(IFileHandle fileHandle, long cloudOffset, long
requestedWriteOffset) {
+ if (cloudOffset != requestedWriteOffset) {
+ throw new IllegalStateException("Misaligned positions in " +
fileHandle.getFileReference()
+ + ", cloudOffset: " + cloudOffset + " !=
requestedWriteOffset: " + requestedWriteOffset);
+ }
+ }
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898928..a233ca5afb 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
private final IWriteBufferProvider bufferProvider;
private final ICloudBufferedWriter bufferedWriter;
private ByteBuffer writeBuffer;
+ private long writtenBytes;
public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter,
IWriteBufferProvider bufferProvider) {
this.bufferedWriter = bufferedWriter;
this.bufferProvider = bufferProvider;
+ writtenBytes = 0;
}
/* ************************************************************
@@ -75,7 +77,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
@Override
public int write(ByteBuffer page) throws HyracksDataException {
open();
- return write(page.array(), 0, page.limit());
+ return write(page.array(), page.position(), page.remaining());
}
@Override
@@ -84,6 +86,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
uploadAndWait();
}
writeBuffer.put((byte) b);
+ writtenBytes += 1;
}
@Override
@@ -102,7 +105,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
// enough to write all
if (writeBuffer.remaining() > pageRemaining) {
writeBuffer.put(b, offset, pageRemaining);
- return len;
+ break;
}
int remaining = writeBuffer.remaining();
@@ -112,9 +115,15 @@ public class CloudResettableInputStream extends
InputStream implements ICloudWri
uploadAndWait();
}
+ writtenBytes += len;
return len;
}
+ @Override
+ public long position() {
+ return writtenBytes;
+ }
+
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (writeBuffer.remaining() == 0) {
@@ -173,6 +182,7 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
if (writeBuffer == null) {
writeBuffer = bufferProvider.getBuffer();
writeBuffer.clear();
+ writtenBytes = 0;
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4d9c..920be9ce2e 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -60,6 +60,11 @@ public interface ICloudWriter {
*/
int write(byte[] b, int off, int len) throws HyracksDataException;
+ /**
+ * @return the current position of the writer
+ */
+ long position();
+
/**
* Finish the write operation
* Note: this should be called upon successful write
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5507..d9119a58f7 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@ public class GCSWriter implements ICloudWriter {
private final IRequestProfiler profiler;
private final Storage gcsClient;
private WriteChannel writer = null;
+ private long writtenBytes;
public GCSWriter(String bucket, String path, Storage gcsClient,
IRequestProfiler profiler) {
this.bucket = bucket;
this.path = path;
this.profiler = profiler;
this.gcsClient = gcsClient;
+ writtenBytes = 0;
}
@Override
@@ -67,17 +69,26 @@ public class GCSWriter implements ICloudWriter {
throw HyracksDataException.create(e);
}
+ writtenBytes += written;
return written;
}
@Override
public int write(byte[] b, int off, int len) throws HyracksDataException {
- return write(ByteBuffer.wrap(b, off, len));
+ int written = write(ByteBuffer.wrap(b, off, len));
+ writtenBytes += written;
+ return written;
+ }
+
+ @Override
+ public long position() {
+ return writtenBytes;
}
@Override
public void write(int b) throws HyracksDataException {
write(ByteBuffer.wrap(new byte[] { (byte) b }));
+ writtenBytes += 1;
}
@Override
@@ -105,6 +116,7 @@ public class GCSWriter implements ICloudWriter {
if (writer == null) {
writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket,
path)).build());
writer.setChunkSize(WRITE_BUFFER_SIZE);
+ writtenBytes = 0;
log("STARTED");
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index c75e83a6e0..69d33e83bd 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@ public final class DefaultCloudOnlyWriteContext implements
IBufferCacheWriteCont
public int write(IOManager ioManager, IFileHandle handle, long offset,
ByteBuffer data)
throws HyracksDataException {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
- return cloudIOManager.cloudWrite(handle, data);
+ return cloudIOManager.cloudWrite(handle, offset, data);
}
@Override
public long write(IOManager ioManager, IFileHandle handle, long offset,
ByteBuffer[] data)
throws HyracksDataException {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
- return cloudIOManager.cloudWrite(handle, data);
+ return cloudIOManager.cloudWrite(handle, offset, data);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index ab57139195..9b6a2ff96f 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -74,19 +74,21 @@ public interface ICloudIOManager {
* Write to cloud only
*
* @param fHandle file handle
+ * @param offset position to write from
* @param data to write
* @return number of written bytes
*/
- int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws
HyracksDataException;
+ int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws
HyracksDataException;
/**
* Write to cloud only
*
* @param fHandle file handle
+ * @param offset position to write from
* @param data to write
* @return number of written bytes
*/
- long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws
HyracksDataException;
+ long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data)
throws HyracksDataException;
/**
* Punch a hole in a file
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27e79..6bc85ffeb4 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@ public class CompressedBufferedFileHandle extends
BufferedFileHandle {
expectedBytesWritten = cBuffer.limit();
bytesWritten = context.write(ioManager, handle, offset,
cBuffer);
} else {
- //Compression did not gain any savings
+ // Compression did not gain any savings
final ByteBuffer[] buffers = header.prepareWrite(cPage);
+ // Incompressible pages should be written entirely
+ fixBufferPointers(buffers[1], 0);
offset = compressedFileManager.writePageInfo(pageId,
bufferCache.getPageSizeWithHeader());
expectedBytesWritten = buffers[0].limit() + (long)
buffers[1].limit();
bytesWritten = context.write(ioManager, handle, offset,
buffers);
@@ -152,7 +154,7 @@ public class CompressedBufferedFileHandle extends
BufferedFileHandle {
long bytesWritten = 0;
for (int i = 1; i < totalPages; i++) {
fixBufferPointers(uBuffer, i);
- cBuffer.position(0);
+ cBuffer.clear();
final ByteBuffer writeBuffer;
if (compressToWriteBuffer(uBuffer, cBuffer) <
bufferCache.getPageSize()) {