This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new b83b06f2ab [MULTIPLE ISSUES][STO] Multiple fixes for cloud dep.
b83b06f2ab is described below
commit b83b06f2abe92f9705d04b00a76d657ec518a40b
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Fri Jun 21 10:35:24 2024 -0700
[MULTIPLE ISSUES][STO] Multiple fixes for cloud dep.
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
ASTERIXDB-3436
- Make certain cloud request uninterruptible
- Retry on request failure
ASTERIXDB-3443
- Fix MergedPagesRanges out of bound exception
Change-Id: Ia8c34d4ba7a3527fea22149e5065815095c39ab2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18378
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../cloud_storage/CloudStorageUnstableTest.java | 109 +++++++++++
.../asterix/cloud/AbstractCloudIOManager.java | 28 ++-
.../asterix/cloud/CloudResettableInputStream.java | 12 +-
.../apache/asterix/cloud/LazyCloudIOManager.java | 9 +-
.../cloud/bulk/DeleteBulkCloudOperation.java | 3 +-
.../asterix/cloud/clients/CloudClientProvider.java | 16 +-
.../asterix/cloud/clients/UnstableCloudClient.java | 187 +++++++++++++++++++
.../cloud/clients/aws/s3/S3BufferedWriter.java | 28 +--
.../cloud/clients/aws/s3/S3CloudClient.java | 7 +
.../apache/hyracks/cloud/io/ICloudIOManager.java | 17 +-
.../cloud/io/request/ICloudBeforeRetryRequest.java | 34 ++++
.../hyracks/cloud/io/request/ICloudRequest.java | 32 ++++
.../cloud/io/request/ICloudReturnableRequest.java | 34 ++++
.../hyracks/cloud/io/stream/CloudInputStream.java | 134 ++++++++++++++
.../cloud/util/CloudRetryableRequestUtil.java | 202 +++++++++++++++++++++
.../read/AbstractPageRangesComputer.java | 74 ++++++++
.../buffercache/read/CloudColumnReadContext.java | 16 +-
.../buffercache/read/CloudMegaPageReadContext.java | 137 +++++---------
.../cloud/buffercache/read/MergedPageRanges.java | 184 -------------------
.../cloud/buffercache/read/PageRangesComputer.java | 179 ++++++++++++++++++
.../buffercache/read/SinglePageRangeComputer.java | 59 ++++++
.../buffercache/read/MergedPageRagesTest.java | 117 ++++++++----
22 files changed, 1267 insertions(+), 351 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
new file mode 100644
index 0000000000..a517ed4dad
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/cloud_storage/CloudStorageUnstableTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.cloud_storage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.asterix.api.common.LocalCloudUtil;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.test.runtime.LangExecutionUtil;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.Description;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Run tests in cloud deployment environment with simulated unstable connection
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class CloudStorageUnstableTest {
+
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private final TestCaseContext tcCtx;
+ private static final String SUITE_TESTS = "testsuite_cloud_storage.xml";
+ private static final String ONLY_TESTS =
"testsuite_cloud_storage_only.xml";
+ private static final String CONFIG_FILE_NAME =
"src/test/resources/cc-cloud-storage.conf";
+ private static final String DELTA_RESULT_PATH = "results_cloud";
+ private static final String EXCLUDED_TESTS = "MP";
+
+ public CloudStorageUnstableTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ System.setProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE,
"true");
+ LocalCloudUtil.startS3CloudEnvironment(true);
+ TestExecutor testExecutor = new TestExecutor(DELTA_RESULT_PATH);
+ testExecutor.executorId = "cloud";
+ testExecutor.stripSubstring = "//DB:";
+ LangExecutionUtil.setUp(CONFIG_FILE_NAME, testExecutor);
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY,
CONFIG_FILE_NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ System.clearProperty(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "CloudStorageUnstableTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ long seed = System.nanoTime();
+ Random random = new Random(seed);
+ LOGGER.info("CloudStorageUnstableTest seed {}", seed);
+ Collection<Object[]> tests = LangExecutionUtil.tests(ONLY_TESTS,
SUITE_TESTS);
+ List<Object[]> selected = new ArrayList<>();
+ for (Object[] test : tests) {
+ // Select 10% of the tests randomly
+ if (random.nextInt(10) == 0) {
+ selected.add(test);
+ }
+ }
+ return selected;
+ }
+
+ @Test
+ public void test() throws Exception {
+ List<TestCase.CompilationUnit> cu =
tcCtx.getTestCase().getCompilationUnit();
+ Assume.assumeTrue(cu.size() > 1 ||
!EXCLUDED_TESTS.equals(getText(cu.get(0).getDescription())));
+ LangExecutionUtil.test(tcCtx);
+ }
+
+ private static String getText(Description description) {
+ return description == null ? "" : description.getValue();
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 35b074431e..91c24e87b6 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -51,6 +51,10 @@ import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -173,12 +177,30 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
@Override
public final void cloudRead(IFileHandle fHandle, long offset, ByteBuffer
data) throws HyracksDataException {
- cloudClient.read(bucket, fHandle.getFileReference().getRelativePath(),
offset, data);
+ int position = data.position();
+ ICloudRequest request =
+ () -> cloudClient.read(bucket,
fHandle.getFileReference().getRelativePath(), offset, data);
+ ICloudBeforeRetryRequest retry = () -> data.position(position);
+ CloudRetryableRequestUtil.run(request, retry);
}
@Override
- public final InputStream cloudRead(IFileHandle fHandle, long offset, long
length) {
- return cloudClient.getObjectStream(bucket,
fHandle.getFileReference().getRelativePath(), offset, length);
+ public final CloudInputStream cloudRead(IFileHandle fHandle, long offset,
long length) throws HyracksDataException {
+ return CloudRetryableRequestUtil.run(() -> new CloudInputStream(this,
fHandle,
+ cloudClient.getObjectStream(bucket,
fHandle.getFileReference().getRelativePath(), offset, length),
+ offset, length));
+ }
+
+ @Override
+ public void restoreStream(CloudInputStream cloudStream) {
+ LOGGER.warn("Restoring stream from cloud, {}", cloudStream);
+ /*
+ * This cloud request should not be called using
CloudRetryableRequestUtil as it is the responsibility of the
+ * caller to warp this request as ICloudRequest or ICloudRetry.
+ */
+ InputStream stream = cloudClient.getObjectStream(bucket,
cloudStream.getPath(), cloudStream.getOffset(),
+ cloudStream.getRemaining());
+ cloudStream.setInputStream(stream);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index 9e3502091d..cace898928 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -25,6 +25,9 @@ import java.nio.ByteBuffer;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -141,7 +144,9 @@ public class CloudResettableInputStream extends InputStream
implements ICloudWri
*/
writeBuffer.flip();
try {
- bufferedWriter.uploadLast(this, writeBuffer);
+ ICloudRequest request = () ->
bufferedWriter.uploadLast(this, writeBuffer);
+ ICloudBeforeRetryRequest retry = () ->
writeBuffer.position(0);
+
CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request, retry);
} catch (Exception e) {
LOGGER.error(e);
throw HyracksDataException.create(e);
@@ -182,7 +187,10 @@ public class CloudResettableInputStream extends
InputStream implements ICloudWri
private void uploadAndWait() throws HyracksDataException {
writeBuffer.flip();
try {
- bufferedWriter.upload(this, writeBuffer.limit());
+ ICloudRequest request = () -> bufferedWriter.upload(this,
writeBuffer.limit());
+ ICloudBeforeRetryRequest retry = () -> writeBuffer.position(0);
+ // This will be interrupted and the interruption will be followed
by a halt
+ CloudRetryableRequestUtil.runWithNoRetryOnInterruption(request,
retry);
} catch (Exception e) {
LOGGER.error(e);
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
index 49b919c3aa..f8cf2c34e2 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java
@@ -53,6 +53,7 @@ import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -174,7 +175,7 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
@Override
public Set<FileReference> list(FileReference dir, FilenameFilter filter)
throws HyracksDataException {
- return accessor.doList(dir, filter);
+ return CloudRetryableRequestUtil.run(() -> accessor.doList(dir,
filter));
}
@Override
@@ -189,18 +190,18 @@ final class LazyCloudIOManager extends
AbstractCloudIOManager {
@Override
public byte[] readAllBytes(FileReference fileRef) throws
HyracksDataException {
- return accessor.doReadAllBytes(fileRef);
+ return CloudRetryableRequestUtil.run(() ->
accessor.doReadAllBytes(fileRef));
}
@Override
public void delete(FileReference fileRef) throws HyracksDataException {
- accessor.doDelete(fileRef);
+ CloudRetryableRequestUtil.run(() -> CloudRetryableRequestUtil.run(()
-> accessor.doDelete(fileRef)));
log("DELETE", fileRef);
}
@Override
public void overwrite(FileReference fileRef, byte[] bytes) throws
HyracksDataException {
- accessor.doOverwrite(fileRef, bytes);
+ CloudRetryableRequestUtil.run(() -> accessor.doOverwrite(fileRef,
bytes));
log("WRITE", fileRef);
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
index 8d28d3a0be..fcb7bda204 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -24,6 +24,7 @@ import java.util.stream.Collectors;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
import org.apache.logging.log4j.LogManager;
@@ -57,7 +58,7 @@ public class DeleteBulkCloudOperation extends
DeleteBulkOperation {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bulk deleting: local: {}, cloud: {}",
fileReferences, paths);
}
- cloudClient.deleteObjects(bucket, paths);
+ CloudRetryableRequestUtil.run(() -> cloudClient.deleteObjects(bucket,
paths));
// Bulk delete locally as well
super.performOperation();
callBack.call(fileReferences);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
index b0a1e0c612..ee43a2cadf 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java
@@ -24,8 +24,10 @@ import
org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig;
import org.apache.asterix.cloud.clients.google.gcs.GCSCloudClient;
import org.apache.asterix.common.config.CloudProperties;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
public class CloudClientProvider {
+ private static final boolean UNSTABLE = isUnstable();
public static final String S3 = "s3";
public static final String GCS = "gs";
@@ -36,13 +38,21 @@ public class CloudClientProvider {
public static ICloudClient getClient(CloudProperties cloudProperties,
ICloudGuardian guardian)
throws HyracksDataException {
String storageScheme = cloudProperties.getStorageScheme();
+ ICloudClient cloudClient;
if (S3.equalsIgnoreCase(storageScheme)) {
S3ClientConfig config = S3ClientConfig.of(cloudProperties);
- return new S3CloudClient(config, guardian);
+ cloudClient = new S3CloudClient(config, guardian);
} else if (GCS.equalsIgnoreCase(storageScheme)) {
GCSClientConfig config = GCSClientConfig.of(cloudProperties);
- return new GCSCloudClient(config, guardian);
+ cloudClient = new GCSCloudClient(config, guardian);
+ } else {
+ throw new IllegalStateException("unsupported cloud storage scheme:
" + storageScheme);
}
- throw new IllegalStateException("unsupported cloud storage scheme: " +
storageScheme);
+
+ return UNSTABLE ? new UnstableCloudClient(cloudClient) : cloudClient;
+ }
+
+ private static boolean isUnstable() {
+ return
Boolean.getBoolean(CloudRetryableRequestUtil.CLOUD_UNSTABLE_MODE);
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
new file mode 100644
index 0000000000..2ec5fb5157
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.clients;
+
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.asterix.cloud.CloudResettableInputStream;
+import org.apache.asterix.cloud.IWriteBufferProvider;
+import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient;
+import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.control.nc.io.IOManager;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class UnstableCloudClient implements ICloudClient {
+ // 10% error rate
+ private static final double ERROR_RATE = 0.1d;
+ private static final Random RANDOM = new Random(0);
+ private final ICloudClient cloudClient;
+
+ public UnstableCloudClient(ICloudClient cloudClient) {
+ this.cloudClient = cloudClient;
+ }
+
+ @Override
+ public int getWriteBufferSize() {
+ return cloudClient.getWriteBufferSize();
+ }
+
+ @Override
+ public IRequestProfiler getProfiler() {
+ return cloudClient.getProfiler();
+ }
+
+ @Override
+ public ICloudWriter createWriter(String bucket, String path,
IWriteBufferProvider bufferProvider) {
+ if (cloudClient instanceof S3CloudClient) {
+ return createUnstableWriter((S3CloudClient) cloudClient, bucket,
path, bufferProvider);
+ }
+ return cloudClient.createWriter(bucket, path, bufferProvider);
+ }
+
+ @Override
+ public Set<CloudFile> listObjects(String bucket, String path,
FilenameFilter filter) {
+ return cloudClient.listObjects(bucket, path, filter);
+ }
+
+ @Override
+ public int read(String bucket, String path, long offset, ByteBuffer
buffer) throws HyracksDataException {
+ fail();
+ return cloudClient.read(bucket, path, offset, buffer);
+ }
+
+ @Override
+ public byte[] readAllBytes(String bucket, String path) throws
HyracksDataException {
+ fail();
+ return cloudClient.readAllBytes(bucket, path);
+ }
+
+ @Override
+ public InputStream getObjectStream(String bucket, String path, long
offset, long length) {
+ return cloudClient.getObjectStream(bucket, path, offset, length);
+ }
+
+ @Override
+ public void write(String bucket, String path, byte[] data) {
+ cloudClient.write(bucket, path, data);
+ }
+
+ @Override
+ public void copy(String bucket, String srcPath, FileReference destPath) {
+ cloudClient.copy(bucket, srcPath, destPath);
+ }
+
+ @Override
+ public void deleteObjects(String bucket, Collection<String> paths) {
+ cloudClient.deleteObjects(bucket, paths);
+ }
+
+ @Override
+ public long getObjectSize(String bucket, String path) throws
HyracksDataException {
+ fail();
+ return cloudClient.getObjectSize(bucket, path);
+ }
+
+ @Override
+ public boolean exists(String bucket, String path) throws
HyracksDataException {
+ fail();
+ return cloudClient.exists(bucket, path);
+ }
+
+ @Override
+ public boolean isEmptyPrefix(String bucket, String path) throws
HyracksDataException {
+ fail();
+ return cloudClient.isEmptyPrefix(bucket, path);
+ }
+
+ @Override
+ public IParallelDownloader createParallelDownloader(String bucket,
IOManager ioManager)
+ throws HyracksDataException {
+ return cloudClient.createParallelDownloader(bucket, ioManager);
+ }
+
+ @Override
+ public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) {
+ return cloudClient.listAsJson(objectMapper, bucket);
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ cloudClient.close();
+ }
+
+ private static void fail() throws HyracksDataException {
+ double prob = RANDOM.nextInt(100) / 100.0d;
+ if (prob <= ERROR_RATE) {
+ throw HyracksDataException.create(new IOException("Simulated
error"));
+ }
+ }
+
+ private static ICloudWriter createUnstableWriter(S3CloudClient
cloudClient, String bucket, String path,
+ IWriteBufferProvider bufferProvider) {
+ ICloudBufferedWriter bufferedWriter =
+ new
UnstableCloudBufferedWriter(cloudClient.createBufferedWriter(bucket, path));
+ return new CloudResettableInputStream(bufferedWriter, bufferProvider);
+ }
+
+ private static class UnstableCloudBufferedWriter implements
ICloudBufferedWriter {
+ private final ICloudBufferedWriter bufferedWriter;
+
+ private UnstableCloudBufferedWriter(ICloudBufferedWriter
bufferedWriter) {
+ this.bufferedWriter = bufferedWriter;
+ }
+
+ @Override
+ public void upload(InputStream stream, int length) throws
HyracksDataException {
+ fail();
+ bufferedWriter.upload(stream, length);
+ }
+
+ @Override
+ public void uploadLast(InputStream stream, ByteBuffer buffer) throws
HyracksDataException {
+ fail();
+ bufferedWriter.uploadLast(stream, buffer);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return bufferedWriter.isEmpty();
+ }
+
+ @Override
+ public void finish() throws HyracksDataException {
+ bufferedWriter.finish();
+ }
+
+ @Override
+ public void abort() throws HyracksDataException {
+ bufferedWriter.abort();
+ }
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
index d0dda2a02c..05a9fc183d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java
@@ -22,12 +22,12 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.asterix.cloud.clients.ICloudBufferedWriter;
import org.apache.asterix.cloud.clients.ICloudGuardian;
import org.apache.asterix.cloud.clients.profiler.IRequestProfiler;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -84,7 +84,6 @@ public class S3BufferedWriter implements ICloudBufferedWriter
{
if (uploadId == null) {
profiler.objectWrite();
PutObjectRequest request =
PutObjectRequest.builder().bucket(bucket).key(path).build();
- // TODO make retryable
s3Client.putObject(request, RequestBody.fromByteBuffer(buffer));
// Only set the uploadId if the putObject succeeds
uploadId = PUT_UPLOAD_ID;
@@ -111,28 +110,9 @@ public class S3BufferedWriter implements
ICloudBufferedWriter {
CompletedMultipartUpload completedMultipartUpload =
CompletedMultipartUpload.builder().parts(partQueue).build();
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(bucket).key(path).uploadId(uploadId).multipartUpload(completedMultipartUpload).build();
- int retries = 0;
- while (true) {
- try {
- completeMultipartUpload(completeMultipartUploadRequest);
- break;
- } catch (Exception e) {
- retries++;
- if (retries == MAX_RETRIES) {
- throw HyracksDataException.create(e);
- }
- LOGGER.info(() -> "S3 storage write retry, encountered: " +
e.getMessage());
-
- // Backoff for 1 sec for the first 2 retries, and 2 seconds
from there onward
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(retries < 2 ? 1 :
2));
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw HyracksDataException.create(ex);
- }
- }
- }
-
+ // This will be interrupted and the interruption will be followed by a
halt
+ CloudRetryableRequestUtil
+ .runWithNoRetryOnInterruption(() ->
completeMultipartUpload(completeMultipartUploadRequest));
log("FINISHED");
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index 5ce1f4393a..254bd0309c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -288,6 +288,13 @@ public final class S3CloudClient implements ICloudClient {
s3Client.close();
}
+ /**
+ * FOR TESTING ONLY
+ */
+ public ICloudBufferedWriter createBufferedWriter(String bucket, String
path) {
+ return new S3BufferedWriter(s3Client, profiler, guardian, bucket,
path);
+ }
+
private static S3Client buildClient(S3ClientConfig config) {
S3ClientBuilder builder = S3Client.builder();
builder.credentialsProvider(config.createCredentialsProvider());
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index 0dca41765b..ab57139195 100644
---
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -18,11 +18,14 @@
*/
package org.apache.hyracks.cloud.io;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
/**
* Certain operations needed to be provided by {@link
org.apache.hyracks.api.io.IIOManager} to support cloud
@@ -45,7 +48,17 @@ public interface ICloudIOManager {
* @param offset starting offset
* @return input stream of the required data
*/
- InputStream cloudRead(IFileHandle fHandle, long offset, long length);
+ CloudInputStream cloudRead(IFileHandle fHandle, long offset, long length)
throws HyracksDataException;
+
+ /**
+ * Tries to restore the stream created by {@link #cloudRead(IFileHandle,
long, long)}
+ * NOTE: The implementer of this method should not use {@link
CloudRetryableRequestUtil} when calling this method.
+ * It is the responsibility of the caller to either call this method as a
+ * {@link ICloudRequest} or as a {@link ICloudBeforeRetryRequest}.
+ *
+ * @param stream to restore
+ */
+ void restoreStream(CloudInputStream stream);
/**
* Write to local drive only
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
new file mode 100644
index 0000000000..4dab80d44c
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudBeforeRetryRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+
+/**
+ * Certain cloud requests require some cleanup (or restoring a state) before a
retry is performed.
+ * An implementation of This interface should be provided if such clean is
required when
+ * reattempting a request using {@link CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudBeforeRetryRequest {
+ /**
+ * Run pre-retry routine before reattempting {@link ICloudRequest} or
{@link ICloudReturnableRequest}
+ */
+ void beforeRetry();
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
new file mode 100644
index 0000000000..4f2532348d
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudRequest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+/**
+ * A cloud request that can be retried using {@link
org.apache.hyracks.cloud.util.CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudRequest {
+ /**
+ * Run the cloud request
+ */
+ void call() throws IOException;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
new file mode 100644
index 0000000000..748165ad8f
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/request/ICloudReturnableRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.request;
+
+import java.io.IOException;
+
+/**
+ * A returnable cloud request that can be retried using {@link
org.apache.hyracks.cloud.util.CloudRetryableRequestUtil}
+ */
+@FunctionalInterface
+public interface ICloudReturnableRequest<T> {
+ /**
+ * Run the cloud request
+ *
+ * @return the value of the request
+ */
+ T call() throws IOException;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
new file mode 100644
index 0000000000..df12003b2f
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/stream/CloudInputStream.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.io.stream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.util.CloudRetryableRequestUtil;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public final class CloudInputStream {
+ private static final Logger LOGGER = LogManager.getLogger();
+ private final ICloudIOManager cloudIOManager;
+ private final IFileHandle handle;
+ private InputStream in;
+ private long offset;
+ private long remaining;
+
+ public CloudInputStream(ICloudIOManager cloudIOManager, IFileHandle
handle, InputStream in, long offset,
+ long length) {
+ this.cloudIOManager = cloudIOManager;
+ this.handle = handle;
+ this.in = in;
+ this.offset = offset;
+ this.remaining = length;
+ }
+
+ public String getPath() {
+ return handle.getFileReference().getRelativePath();
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getRemaining() {
+ return remaining;
+ }
+
+ public void read(ByteBuffer buffer) throws HyracksDataException {
+ int position = buffer.position();
+ ICloudRequest read = () -> {
+ while (buffer.remaining() > 0) {
+ int length = in.read(buffer.array(), buffer.position(),
buffer.remaining());
+ if (length < 0) {
+ throw new IllegalStateException("Stream should not be
empty!");
+ }
+ buffer.position(buffer.position() + length);
+ }
+ };
+
+ ICloudBeforeRetryRequest retry = () -> {
+ buffer.position(position);
+ cloudIOManager.restoreStream(this);
+ };
+
+ CloudRetryableRequestUtil.run(read, retry);
+
+ offset += buffer.limit();
+ remaining -= buffer.limit();
+ }
+
+ public void skipTo(long newOffset) throws HyracksDataException {
+ if (newOffset > offset) {
+ skip(newOffset - offset);
+ }
+ }
+
+ public void close() {
+ if (remaining != 0) {
+ LOGGER.warn("Closed cloud stream with nonzero bytes = {}",
remaining);
+ }
+
+ try {
+ in.close();
+ } catch (IOException e) {
+ LOGGER.error("Failed to close stream", e);
+ }
+ }
+
+ public void setInputStream(InputStream in) {
+ this.in = in;
+ }
+
+ private void skip(long n) throws HyracksDataException {
+ /*
+ * Advance offset and reduce the remaining so that the streamRestore
will start from where we want to skip
+ * in case the stream has to be restored.
+ */
+ offset += n;
+ remaining -= n;
+
+ try {
+ long remaining = n;
+ while (remaining > 0) {
+ remaining -= in.skip(remaining);
+ }
+ } catch (Throwable e) {
+ if (remaining > 0) {
+ // Only restore the stream if additional bytes are required
+ CloudRetryableRequestUtil.run(() ->
cloudIOManager.restoreStream(this));
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{file: " + handle.getFileReference() + ", streamOffset: " +
offset + ", streamRemaining: " + remaining
+ + "}";
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
new file mode 100644
index 0000000000..2d0afbe53e
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/util/CloudRetryableRequestUtil.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.cloud.util;
+
+import java.io.IOException;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.cloud.io.request.ICloudBeforeRetryRequest;
+import org.apache.hyracks.cloud.io.request.ICloudRequest;
+import org.apache.hyracks.cloud.io.request.ICloudReturnableRequest;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Run {@link ICloudRequest} and {@link ICloudReturnableRequest} with retries
+ */
+public class CloudRetryableRequestUtil {
+ /**
+ * Whether simulating/testing unstable cloud environment or not. This
value affects the number of retries.
+ * Set this as a system property to 'true' to indicate running an unstable
cloud environment.
+ *
+ * @see System#setProperty(String, String)
+ */
+ public static final String CLOUD_UNSTABLE_MODE = "cloud.unstable.mode";
+ private static final int STABLE_NUMBER_OF_RETRIES = 5;
+ private static final int UNSTABLE_NUMBER_OF_RETRIES = 100;
+ private static final Logger LOGGER = LogManager.getLogger();
+ private static final int NUMBER_OF_RETRIES = getNumberOfRetries();
+
+ private static final ICloudBeforeRetryRequest NO_OP_RETRY = () -> {
+ };
+
+ private CloudRetryableRequestUtil() {
+ }
+
+ /**
+ * Run an idempotent request and will retry if failed or interrupted
+ *
+ * @param request request to run
+ */
+ public static void run(ICloudRequest request) throws HyracksDataException {
+ run(request, NO_OP_RETRY);
+ }
+
+ /**
+ * Run a none-idempotent request and will retry if failed or interrupted.
+ * As the operation is not idempotent, {@link ICloudBeforeRetryRequest}
ensures the idempotency of the provided operation
+ *
+ * @param request request to run
+ * @param retry a pre-retry routine to make the operation idempotent
+ */
+ public static void run(ICloudRequest request, ICloudBeforeRetryRequest
retry) throws HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ doRun(request, retry);
+ break;
+ } catch (Throwable e) {
+ // First, clear the interrupted flag
+ interrupted |= Thread.interrupted();
+ if (!ExceptionUtils.causedByInterrupt(e)) {
+ // The cause isn't an interruption, rethrow
+ throw e;
+ }
+ retry.beforeRetry();
+ LOGGER.warn("Ignored interrupting
ICloudReturnableRequest", e);
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Run an idempotent returnable request and will retry if failed or
interrupted.
+ *
+ * @param request request to run
+ * @param <T> return type
+ * @return a value of return type
+ */
+ public static <T> T run(ICloudReturnableRequest<T> request) throws
HyracksDataException {
+ return run(request, NO_OP_RETRY);
+ }
+
+ /**
+ * Run a none-idempotent returnable request and will retry if failed or
interrupted.
+ * As the operation is not idempotent, {@link ICloudBeforeRetryRequest}
ensures the idempotency of the provided operation
+ *
+ * @param request request to run
+ * @param <T> return type
+ * @param retry a pre-retry routine to make the operation idempotent
+ * @return a value of return type
+ */
+ public static <T> T run(ICloudReturnableRequest<T> request,
ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ boolean interrupted = Thread.interrupted();
+ try {
+ while (true) {
+ try {
+ return doRun(request, retry);
+ } catch (Throwable e) {
+ // First, clear the interrupted flag
+ interrupted |= Thread.interrupted();
+ if (!ExceptionUtils.causedByInterrupt(e)) {
+ // The cause isn't an interruption, rethrow
+ throw e;
+ }
+ retry.beforeRetry();
+ LOGGER.warn("Ignored interrupting
ICloudReturnableRequest", e);
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Run an idempotent request and will retry if failed.
+ * This will not retry if the thread is interrupted
+ *
+ * @param request request to run
+ */
+ public static void runWithNoRetryOnInterruption(ICloudRequest request)
throws HyracksDataException {
+ doRun(request, NO_OP_RETRY);
+ }
+
+ /**
+ * Run a none-idempotent request and will retry if failed
+ * This will not retry if the thread is interrupted.
+ * As the operation is not idempotent, {@link ICloudBeforeRetryRequest}
ensures the idempotency of the provided operation
+ *
+ * @param request request to run
+ * @param retry a pre-retry routine to make the operation idempotent
+ */
+ public static void runWithNoRetryOnInterruption(ICloudRequest request,
ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ doRun(request, retry);
+ }
+
+ private static <T> T doRun(ICloudReturnableRequest<T> request,
ICloudBeforeRetryRequest retry)
+ throws HyracksDataException {
+ int attempt = 1;
+ while (true) {
+ try {
+ return request.call();
+ } catch (IOException e) {
+ if (attempt > NUMBER_OF_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ attempt++;
+ retry.beforeRetry();
+ LOGGER.warn("Failed to perform ICloudReturnableRequest,
performing {}/{}", attempt, NUMBER_OF_RETRIES,
+ e);
+ }
+ }
+ }
+
+ private static void doRun(ICloudRequest request, ICloudBeforeRetryRequest
retry) throws HyracksDataException {
+ int attempt = 1;
+ while (true) {
+ try {
+ request.call();
+ break;
+ } catch (IOException e) {
+ if (attempt > NUMBER_OF_RETRIES) {
+ throw HyracksDataException.create(e);
+ }
+ attempt++;
+ retry.beforeRetry();
+ LOGGER.warn("Failed to perform ICloudRequest, performing
{}/{}", attempt, NUMBER_OF_RETRIES, e);
+ }
+ }
+ }
+
+ private static int getNumberOfRetries() {
+ boolean unstable = Boolean.getBoolean(CLOUD_UNSTABLE_MODE);
+ return unstable ? UNSTABLE_NUMBER_OF_RETRIES :
STABLE_NUMBER_OF_RETRIES;
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
new file mode 100644
index 0000000000..76b9650283
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/AbstractPageRangesComputer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.BitSet;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+public abstract class AbstractPageRangesComputer {
+ protected static final int INITIAL_SIZE = 40;
+ // Indicates a page is requested or not
+ protected final BitSet requestedPages;
+
+ AbstractPageRangesComputer() {
+ requestedPages = new BitSet();
+ }
+
+ abstract int getMaxNumberOfRanges();
+
+ /**
+ * Clear ranges
+ */
+ abstract void clear();
+
+ /**
+ * Add a range
+ *
+ * @param start range start
+ * @param end range end
+ */
+ abstract void addRange(int start, int end);
+
+ /**
+ * Pin the calculated ranges
+ *
+ * @param ctx Column mega-page buffer cache read context
+ * @param bufferCache buffer cache
+ * @param fileId fileId
+ * @param pageZeroId page zero ID
+ */
+ abstract void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache,
int fileId, int pageZeroId)
+ throws HyracksDataException;
+
+ /**
+ * Creates a new range computer
+ *
+ * @param maxNumberOfRanges maximum number of ranges
+ * @return a new instance of {@link AbstractPageRangesComputer}
+ */
+ static AbstractPageRangesComputer create(int maxNumberOfRanges) {
+ if (maxNumberOfRanges == 1) {
+ return new SinglePageRangeComputer();
+ }
+
+ return new PageRangesComputer(maxNumberOfRanges);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
index fef150a3ed..1ecc509467 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudColumnReadContext.java
@@ -22,6 +22,7 @@ import static
org.apache.hyracks.cloud.buffercache.context.DefaultCloudReadConte
import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MERGE;
import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.MODIFY;
import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType.QUERY;
+import static
org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read.CloudMegaPageReadContext.ALL_PAGES;
import java.nio.ByteBuffer;
import java.util.BitSet;
@@ -55,7 +56,7 @@ public final class CloudColumnReadContext implements
IColumnReadContext {
private final ColumnRanges columnRanges;
private final CloudMegaPageReadContext columnCtx;
private final BitSet projectedColumns;
- private final MergedPageRanges mergedPageRanges;
+ private final AbstractPageRangesComputer mergedPageRanges;
public CloudColumnReadContext(IColumnProjectionInfo projectionInfo,
IPhysicalDrive drive, BitSet plan) {
this.operation = projectionInfo.getProjectorType();
@@ -65,7 +66,7 @@ public final class CloudColumnReadContext implements
IColumnReadContext {
cloudOnlyColumns = new BitSet();
columnCtx = new CloudMegaPageReadContext(operation, columnRanges,
drive);
projectedColumns = new BitSet();
- mergedPageRanges = new MergedPageRanges(columnCtx, MAX_RANGES_COUNT);
+ mergedPageRanges = AbstractPageRangesComputer.create(MAX_RANGES_COUNT);
if (operation == QUERY || operation == MODIFY) {
for (int i = 0; i < projectionInfo.getNumberOfProjectedColumns();
i++) {
int columnIndex = projectionInfo.getColumnIndex(i);
@@ -113,10 +114,10 @@ public final class CloudColumnReadContext implements
IColumnReadContext {
public ICachedPage pinNext(ColumnBTreeReadLeafFrame leafFrame,
IBufferCache bufferCache, int fileId)
throws HyracksDataException {
int nextLeaf = leafFrame.getNextLeaf();
- // Release the previous pages (including page0)
+ // Release the previous pages
release(bufferCache);
+ // Release page0
bufferCache.unpin(leafFrame.getPage(), this);
-
// pin the next page0
ICachedPage nextPage =
bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, nextLeaf), this);
leafFrame.setPage(nextPage);
@@ -142,11 +143,11 @@ public final class CloudColumnReadContext implements
IColumnReadContext {
private void pinAll(int fileId, int pageZeroId, int numberOfPages,
IBufferCache bufferCache)
throws HyracksDataException {
- columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages,
numberOfPages, MergedPageRanges.EMPTY);
+ columnCtx.pin(bufferCache, fileId, pageZeroId, 1, numberOfPages,
ALL_PAGES);
}
private void pinProjected(int fileId, int pageZeroId, IBufferCache
bufferCache) throws HyracksDataException {
- mergedPageRanges.reset();
+ mergedPageRanges.clear();
int[] columnsOrder = columnRanges.getColumnsOrder();
int i = 0;
int columnIndex = columnsOrder[i];
@@ -186,8 +187,9 @@ public final class CloudColumnReadContext implements
IColumnReadContext {
mergedPageRanges.addRange(firstPageIdx, lastPageIdx);
}
+
// pin the calculated pageRanges
- mergedPageRanges.pin(fileId, pageZeroId, bufferCache);
+ mergedPageRanges.pin(columnCtx, bufferCache, fileId, pageZeroId);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
index ed6e83f914..29c9467476 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/CloudMegaPageReadContext.java
@@ -22,7 +22,6 @@ import static
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.Colu
import static
org.apache.hyracks.storage.common.buffercache.context.read.DefaultBufferCacheReadContextProvider.DEFAULT;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
@@ -32,6 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.cloud.buffercache.context.BufferCacheCloudReadContextUtil;
import org.apache.hyracks.cloud.buffercache.page.CloudCachedPage;
import org.apache.hyracks.cloud.io.ICloudIOManager;
+import org.apache.hyracks.cloud.io.stream.CloudInputStream;
import org.apache.hyracks.control.nc.io.IOManager;
import
org.apache.hyracks.storage.am.lsm.btree.column.api.projection.ColumnProjectorType;
import org.apache.hyracks.storage.am.lsm.btree.column.cloud.ColumnRanges;
@@ -47,22 +47,20 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@NotThreadSafe
-final class CloudMegaPageReadContext implements IBufferCacheReadContext {
+public final class CloudMegaPageReadContext implements IBufferCacheReadContext
{
private static final Logger LOGGER = LogManager.getLogger();
+ static final BitSet ALL_PAGES = new BitSet();
private final ColumnProjectorType operation;
private final ColumnRanges columnRanges;
private final IPhysicalDrive drive;
private final List<ICachedPage> pinnedPages;
private int numberOfContiguousPages;
- // For logging, to get actual number of wanted pages
- private int numberOfWantedPages;
private int pageCounter;
- private InputStream gapStream;
+ private CloudInputStream gapStream;
- // For debugging
- private long streamOffset;
- private long remainingStreamBytes;
+ // for debugging
+ int pageZeroId;
CloudMegaPageReadContext(ColumnProjectorType operation, ColumnRanges
columnRanges, IPhysicalDrive drive) {
this.operation = operation;
@@ -71,13 +69,13 @@ final class CloudMegaPageReadContext implements
IBufferCacheReadContext {
pinnedPages = new ArrayList<>();
}
- void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start,
int numberOfPages,
- int numberOfWantedPages, BitSet unwantedPages) throws
HyracksDataException {
+ void pin(IBufferCache bufferCache, int fileId, int pageZeroId, int start,
int numberOfPages, BitSet requestedPages)
+ throws HyracksDataException {
closeStream();
this.numberOfContiguousPages = numberOfPages;
- this.numberOfWantedPages = numberOfWantedPages;
pageCounter = 0;
- doPin(bufferCache, fileId, pageZeroId, start, numberOfPages,
numberOfWantedPages, unwantedPages);
+ this.pageZeroId = pageZeroId;
+ doPin(bufferCache, fileId, pageZeroId, start, numberOfPages,
requestedPages);
}
@Override
@@ -86,10 +84,10 @@ final class CloudMegaPageReadContext implements
IBufferCacheReadContext {
if (cachedPage.skipCloudStream()) {
/*
* This page is requested but the buffer cache has a valid copy in
memory. Also, the page itself was
- * requested to be read from the cloud. Since this page is valid,
no buffer cache read() will be performed.
- * As the buffer cache read() is also responsible for persisting
the bytes read from the cloud, we can end
- * up writing the bytes of this page in the position of another
page. Therefore, we should skip the bytes
- * for this particular page to avoid placing the bytes of this
page into another page's position.
+ * gapStream requested to be read from the cloud. Since this page
is valid, no buffer cache read() will be
+ * performed. As the buffer cache read() is also responsible for
persisting the bytes read from the cloud,
+ * we can end up writing the bytes of this page in the position of
another page. Therefore, we should skip
+ * the bytes for this particular page to avoid placing the bytes
of this page into another page's position.
*/
skipStreamIfOpened(cachedPage);
}
@@ -152,49 +150,28 @@ final class CloudMegaPageReadContext implements
IBufferCacheReadContext {
pinnedPages.clear();
}
- void closeStream() throws HyracksDataException {
+ void closeStream() {
if (gapStream != null) {
- if (remainingStreamBytes != 0) {
- LOGGER.warn("Closed cloud stream with nonzero bytes = {}",
remainingStreamBytes);
- }
-
- try {
- gapStream.close();
- gapStream = null;
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ gapStream.close();
+ gapStream = null;
}
}
private void readFromStream(IOManager ioManager, BufferedFileHandle
fileHandle, BufferCacheHeaderHelper header,
CachedPage cPage, boolean persist) throws HyracksDataException {
- InputStream stream = getOrCreateStream(ioManager, fileHandle, cPage);
+ CloudInputStream stream = getOrCreateStream(ioManager, fileHandle,
cPage);
ByteBuffer buffer = header.getBuffer();
buffer.position(0);
- // If the stream consists of the unwanted pages,
- // if the currentPage's offset is greater, this means
- // the streamOffset is pointing to a previous page.
-
- // hence we should skip those many bytes.
- // eg: if pageId(cPage) = 7 and streamOffset is pointing at 5
- // then we need to jump 5,6 page worth of compressed size.
- if (cPage.getCompressedPageOffset() > streamOffset) {
- skipBytes(cPage.getCompressedPageOffset() - streamOffset);
- }
+ /*
+ * The 'gapStream' could point to an offset of an unwanted page due to
range merging. For example, if
+ * 'gapStream' is currently at the offset of pageId = 5 and the cPage
is for pageId = 7, then, the stream
+ * must be advanced to the cPage's offset (i.e., offset of pageId = 7)
-- skipping pages 5 and 6.
+ */
+ gapStream.skipTo(cPage.getCompressedPageOffset());
- try {
- while (buffer.remaining() > 0) {
- int length = stream.read(buffer.array(), buffer.position(),
buffer.remaining());
- if (length < 0) {
- throw new IllegalStateException("Stream should not be
empty!");
- }
- buffer.position(buffer.position() + length);
- }
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
+ // Get the page's data from the cloud
+ doStreamRead(stream, buffer);
// Flip the buffer after reading to restore the correct position
buffer.flip();
@@ -204,12 +181,9 @@ final class CloudMegaPageReadContext implements
IBufferCacheReadContext {
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
BufferCacheCloudReadContextUtil.persist(cloudIOManager,
fileHandle.getFileHandle(), buffer, offset);
}
-
- streamOffset += cPage.getCompressedPageSize();
- remainingStreamBytes -= cPage.getCompressedPageSize();
}
- private InputStream getOrCreateStream(IOManager ioManager,
BufferedFileHandle fileHandle, CachedPage cPage)
+ private CloudInputStream getOrCreateStream(IOManager ioManager,
BufferedFileHandle fileHandle, CachedPage cPage)
throws HyracksDataException {
if (gapStream != null) {
return gapStream;
@@ -219,34 +193,24 @@ final class CloudMegaPageReadContext implements
IBufferCacheReadContext {
long offset = cPage.getCompressedPageOffset();
int pageId = BufferedFileHandle.getPageId(cPage.getDiskPageId());
long length = fileHandle.getPagesTotalSize(pageId, requiredNumOfPages);
- remainingStreamBytes = length;
- streamOffset = offset;
- LOGGER.info(
- "Cloud stream read for pageId={} starting from pageCounter={}
out of "
- + "numberOfContiguousPages={} with
numberOfWantedPages={}"
- + " (streamOffset = {}, remainingStreamBytes = {})",
- pageId, pageCounter, numberOfContiguousPages,
numberOfWantedPages, streamOffset, remainingStreamBytes);
-
ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
gapStream = cloudIOManager.cloudRead(fileHandle.getFileHandle(),
offset, length);
+ LOGGER.info(
+ "Cloud stream read for pageId={} starting from pageCounter={}
out of "
+ + "numberOfContiguousPages={}. pageZeroId={} stream:
{}",
+ pageId, pageCounter, numberOfContiguousPages, pageZeroId,
gapStream);
+
return gapStream;
}
- private void skipBytes(long length) throws HyracksDataException {
- if (gapStream == null) {
- return;
- }
-
+ private void doStreamRead(CloudInputStream stream, ByteBuffer buffer)
throws HyracksDataException {
+ int length = buffer.remaining();
try {
- long lengthToSkip = length;
- while (length > 0) {
- length -= gapStream.skip(length);
- }
- streamOffset += lengthToSkip;
- remainingStreamBytes -= lengthToSkip;
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ stream.read(buffer);
+ } catch (Throwable th) {
+ LOGGER.warn("Failed to READ {} bytes from stream {}", length,
gapStream);
+ throw HyracksDataException.create(th);
}
}
@@ -255,34 +219,29 @@ final class CloudMegaPageReadContext implements
IBufferCacheReadContext {
return;
}
+ // Ensure the stream starts from the page's offset and also skip the
page's content
+ long newOffset = cPage.getCompressedPageOffset() +
cPage.getCompressedPageSize();
try {
- long remaining = cPage.getCompressedPageSize();
- while (remaining > 0) {
- remaining -= gapStream.skip(remaining);
- }
- streamOffset += cPage.getCompressedPageSize();
- remainingStreamBytes -= cPage.getCompressedPageSize();
+ gapStream.skipTo(newOffset);
} catch (IOException e) {
+ LOGGER.warn("Failed to SKIP to new offset {} from stream {}",
newOffset, gapStream);
throw HyracksDataException.create(e);
}
}
private void doPin(IBufferCache bufferCache, int fileId, int pageZeroId,
int start, int numberOfPages,
- int numberOfWantedPages, BitSet unwantedPages) throws
HyracksDataException {
+ BitSet requestedPages) throws HyracksDataException {
for (int i = start; i < start + numberOfPages; i++) {
- int pageId = pageZeroId + i;
- long dpid = BufferedFileHandle.getDiskPageId(fileId, pageId);
try {
- if (!unwantedPages.get(pageId)) {
+ if (requestedPages == ALL_PAGES || requestedPages.get(i)) {
+ int pageId = pageZeroId + i;
+ long dpid = BufferedFileHandle.getDiskPageId(fileId,
pageId);
pinnedPages.add(bufferCache.pin(dpid, this));
}
pageCounter++;
} catch (Throwable e) {
- LOGGER.error(
- "Error while pinning page number {} with number of
pages streamed {}, "
- + "with actually wanted number of pages {}"
- + "(streamOffset:{}, remainingStreamBytes: {})
columnRanges:\n {}",
- i, numberOfPages, numberOfWantedPages, streamOffset,
remainingStreamBytes, columnRanges);
+ LOGGER.error("Error while pinning page number {} with number
of pages {}. "
+ + "stream: {}, columnRanges:\n {}", i, numberOfPages,
gapStream, columnRanges);
throw e;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
deleted file mode 100644
index c0c2fc92b8..0000000000
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRanges.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
-
-import java.util.BitSet;
-
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
-import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
-import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
-
-/**
- * Merge the given ranges such that the maximum number of ranges <= N.
- * Merge should be greedy as the range having lower gaps should be given
priority.
- */
-public class MergedPageRanges {
- public static final BitSet EMPTY = new BitSet();
- private final CloudMegaPageReadContext columnCtx;
- private final int numRequiredRanges;
- private final IntList pageRanges;
- private final LongPriorityQueue gapRanges;
- // indicates the index of the ranges which are merged
- private final BitSet mergedIndex = new BitSet();
- // indicates a page is requested or not
- private final BitSet unwantedPages = new BitSet();
- // indicates the extra pages got included while a merge
- private int currentIndex = 0;
- private int numRanges;
-
- MergedPageRanges(CloudMegaPageReadContext columnCtx, int
numRequiredRanges) {
- this.numRequiredRanges = numRequiredRanges;
- this.pageRanges = new IntArrayList(40);
- this.gapRanges = new
LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
- this.columnCtx = columnCtx;
- this.numRanges = 0;
- }
-
- public void reset() {
- mergedIndex.clear();
- pageRanges.clear();
- gapRanges.clear();
- numRanges = 0;
- currentIndex = 0;
- }
-
- public void addRange(int rangeStart, int rangeEnd) {
- pageRanges.add(rangeStart);
- pageRanges.add(rangeEnd);
- numRanges++;
- }
-
- public void mergeRanges() {
- // totalMerges = totalRanges - MAXIMUM_RESULTANT_RANGES
- int merges = numRanges - numRequiredRanges;
- for (int i = 2; i < pageRanges.size(); i += 2) {
- int previousRangeEnd = pageRanges.getInt(i - 1);
- int currentRangeStart = pageRanges.getInt(i);
- // this could be optimized to just enqueue "merges" ranges,
- // but won't be much diff as the number of ranges gonna be small
- long gap = IntPairUtil.of(currentRangeStart - previousRangeEnd, i
/ 2);
- gapRanges.enqueue(gap);
- }
-
- int count = 0;
- while (count < merges) {
- // extract the lower 32 bits for the index.
- int index = IntPairUtil.getSecond(gapRanges.dequeueLong());
- // set the bit from [index - 1, index] indicating
- // the index and index-1 are merged.
- mergedIndex.set(index - 1, index + 1);
- count++;
- }
- }
-
- public void pin(int fileId, int pageZeroId, IBufferCache bufferCache)
throws HyracksDataException {
- // since the numRanges are already within set threshold
- if (numRanges <= numRequiredRanges) {
- pinWithoutMerge(fileId, pageZeroId, bufferCache);
- return;
- }
- pinWithMerge(fileId, pageZeroId, bufferCache);
- }
-
- private void pinWithoutMerge(int fileId, int pageZeroId, IBufferCache
bufferCache) throws HyracksDataException {
- for (int pageIndex = 1; pageIndex < pageRanges.size(); pageIndex += 2)
{
- int lastPageIndex = pageRanges.getInt(pageIndex);
- int firstPageIndex = pageRanges.getInt(pageIndex - 1);
- int numberOfPages = lastPageIndex - firstPageIndex + 1;
- columnCtx.pin(bufferCache, fileId, pageZeroId, firstPageIndex,
numberOfPages, numberOfPages, EMPTY);
- }
- }
-
- private void pinWithMerge(int fileId, int pageZeroId, IBufferCache
bufferCache) throws HyracksDataException {
- // merge the range based on the numRequiredRanges.
- mergeRanges();
- // go through page ranges and pin the required ranges.
- int rangeCnt = 0;
- while (rangeCnt < numRequiredRanges) {
- unwantedPages.clear();
- long mergedRange = getNextRange();
-
- int firstRangeIdx = IntPairUtil.getFirst(mergedRange);
- int lastRangeIdx = IntPairUtil.getSecond(mergedRange);
-
- // since the ranges are flattened out in the pageRanges.
- // hence ith index's element would be at [2*i, 2*i + 1]
- int firstRangeStart = pageRanges.getInt(2 * firstRangeIdx);
- int firstRangeEnd = pageRanges.getInt(2 * firstRangeIdx + 1);
- int lastRangeStart = pageRanges.getInt(2 * lastRangeIdx);
- int lastRangeEnd = pageRanges.getInt(2 * lastRangeIdx + 1);
-
- int numberOfPages = lastRangeEnd - firstRangeStart + 1;
- // Number of unwanted pages will be zero, when there is just a
single range (i.e. no merge)
- boolean areUnwantedPages = firstRangeIdx != lastRangeIdx;
- // and when the there is no extra page being fetched. eg: [1 2] [3
4]
- // for: [ 1 2 ] [ 4 5 ] [ 7 8 ] -> [ 1 8 ] ( fromIndex = 0,
toIndex = 2 )
- // numberOfUnwantedPages = (4 - 2 - 1) + (7 - 5 -1) = 2
- areUnwantedPages = areUnwantedPages && (lastRangeStart -
firstRangeEnd > 1);
- int numberOfUnwantedPages = 0;
- if (areUnwantedPages) {
- // iterate through the index and mark the gaps
- for (int fromIndex = firstRangeIdx; fromIndex < lastRangeIdx;
fromIndex++) {
- // Gap = V (2 * (fromIndex+1) ) - V(fromIndex * 2 + 1)
- // V(index) = value at the index
- int fromRangeEnd = pageRanges.getInt(2 * fromIndex + 1);
- int toRangeStart = pageRanges.getInt(2 * (fromIndex + 1));
- // fromRangeEnd != toRangeStart, as they would have been
merged already
- int rangeGap = (fromRangeEnd == toRangeStart) ? 0 :
toRangeStart - fromRangeEnd - 1;
- if (rangeGap > 0) {
- unwantedPages.set(fromRangeEnd + 1, toRangeStart);
- }
- numberOfUnwantedPages += rangeGap;
- }
- }
-
- columnCtx.pin(bufferCache, fileId, pageZeroId, firstRangeStart,
numberOfPages,
- numberOfPages - numberOfUnwantedPages, unwantedPages);
- rangeCnt++;
- }
- }
-
- // making package-private for MergedPageRangesTest
- long getNextRange() {
- int fromIndex = currentIndex;
- int endIndex = currentIndex;
- int toIndex;
-
- // move till we have a set index, indicating all the indexes
- // are merged into one range.
- while (endIndex < numRanges && mergedIndex.get(endIndex)) {
- endIndex++;
- }
-
- if (fromIndex == endIndex) {
- currentIndex = endIndex + 1;
- toIndex = endIndex;
- } else {
- currentIndex = endIndex;
- toIndex = endIndex - 1;
- }
-
- return IntPairUtil.of(fromIndex, toIndex);
- }
-}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
new file mode 100644
index 0000000000..28612c49d2
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/PageRangesComputer.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.ints.IntList;
+import it.unimi.dsi.fastutil.longs.LongArrayPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given
priority.
+ */
+final class PageRangesComputer extends AbstractPageRangesComputer {
+ static final int SINGLE_RANGE = 0;
+ static final int EACH_RANGE = 1;
+ static final int BOUNDARIES = 2;
+
+ private final int maxNumberOfRanges;
+ private final LongPriorityQueue gapRanges;
+ final IntList pageRanges;
+ final int[] rangeBoundaries;
+
+ PageRangesComputer(int maxNumberOfRanges) {
+ this.maxNumberOfRanges = maxNumberOfRanges;
+ pageRanges = new IntArrayList(INITIAL_SIZE);
+ gapRanges = new LongArrayPriorityQueue(IntPairUtil.FIRST_COMPARATOR);
+ rangeBoundaries = new int[maxNumberOfRanges];
+ }
+
+ @Override
+ int getMaxNumberOfRanges() {
+ return maxNumberOfRanges;
+ }
+
+ @Override
+ void clear() {
+ pageRanges.clear();
+ gapRanges.clear();
+ requestedPages.clear();
+ }
+
+ @Override
+ void addRange(int rangeStart, int rangeEnd) {
+ int previousEnd = pageRanges.size() - 1;
+
+ pageRanges.add(rangeStart);
+ pageRanges.add(rangeEnd);
+ requestedPages.set(rangeStart, rangeEnd + 1);
+
+ if (previousEnd > 0) {
+ int maxNumberOfCuts = maxNumberOfRanges - 1;
+ int gapSize = rangeStart - pageRanges.getInt(previousEnd) - 1;
+ if (gapRanges.size() < maxNumberOfCuts) {
+ // Didn't reach the maximum number cuts, add this gap
+ gapRanges.enqueue(IntPairUtil.of(gapSize, previousEnd));
+ } else if (IntPairUtil.getFirst(gapRanges.firstLong()) < gapSize) {
+ // Found a bigger gap. Remove the smallest and add this new
bigger gap
+ gapRanges.dequeueLong();
+ gapRanges.enqueue(IntPairUtil.of(gapSize, previousEnd));
+ }
+ // This gap is smaller than the smallest gap in the queue, ignore
+ // A smaller gap than the smallest gap. Ignore it
+ }
+ }
+
+ @Override
+ void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int
fileId, int pageZeroId)
+ throws HyracksDataException {
+ int mergeResult = mergeRanges();
+ switch (mergeResult) {
+ case SINGLE_RANGE:
+ pinAsSingleRange(ctx, bufferCache, fileId, pageZeroId);
+ break;
+ case EACH_RANGE:
+ pinEachRange(ctx, bufferCache, fileId, pageZeroId);
+ break;
+ default:
+ pinMergedRanges(ctx, bufferCache, fileId, pageZeroId);
+ break;
+ }
+ }
+
+ int mergeRanges() {
+ int i = 0;
+ int maxGap = 0;
+ while (!gapRanges.isEmpty()) {
+ long pair = gapRanges.dequeueLong();
+ maxGap = Math.max(maxGap, IntPairUtil.getFirst(pair));
+ rangeBoundaries[i] = IntPairUtil.getSecond(pair);
+ i++;
+ }
+
+ if (maxGap == 1) {
+ // The biggest gap is 1, merge the ranges in a single range
+ return SINGLE_RANGE;
+ }
+
+ if (getNumberOfRanges() <= maxNumberOfRanges) {
+ // the number of ranges are within the limit, pin each range
separately
+ return EACH_RANGE;
+ }
+
+ // Set the last boundary
+ rangeBoundaries[maxNumberOfRanges - 1] = pageRanges.size() - 1;
+
+ // Sort cuts smallest to largest
+ Arrays.sort(rangeBoundaries);
+
+ // Use the boundaries to cut the ranges into separate ones
+ return BOUNDARIES;
+ }
+
+ private int getNumberOfRanges() {
+ return pageRanges.size() / 2;
+ }
+
+ private void pinAsSingleRange(CloudMegaPageReadContext ctx, IBufferCache
bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int start = pageRanges.getInt(0);
+ int end = pageRanges.getInt(pageRanges.size() - 1);
+ int numberOfPages = end - start + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, start, numberOfPages,
requestedPages);
+ }
+
+ private void pinEachRange(CloudMegaPageReadContext ctx, IBufferCache
bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int numberOfRanges = getNumberOfRanges();
+ for (int i = 0; i < numberOfRanges; i += 2) {
+ int start = pageRanges.getInt(i);
+ int end = pageRanges.getInt(i + 1);
+ int numberOfPages = end - start + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, start, numberOfPages,
requestedPages);
+ }
+ }
+
+ private void pinMergedRanges(CloudMegaPageReadContext ctx, IBufferCache
bufferCache, int fileId, int pageZeroId)
+ throws HyracksDataException {
+ int startIndex = 0;
+ for (int i = 0; i < rangeBoundaries.length; i++) {
+ int endIndex = rangeBoundaries[i];
+ int rangeStart = pageRanges.getInt(startIndex);
+ int rangeEnd = pageRanges.getInt(endIndex);
+ int numberOfPages = rangeEnd - rangeStart + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, rangeStart,
numberOfPages, requestedPages);
+
+ // Start from the next cut
+ startIndex = endIndex + 1;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{rangeBoundaries: " + Arrays.toString(rangeBoundaries) + ",
pageRanges: " + pageRanges
+ + ", requestedPages: " + requestedPages + "}";
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
new file mode 100644
index 0000000000..4c9536ba33
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/SinglePageRangeComputer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+
+/**
+ * Merge the given ranges such that the maximum number of ranges <= N.
+ * Merge should be greedy as the range having lower gaps should be given
priority.
+ */
+final class SinglePageRangeComputer extends AbstractPageRangesComputer {
+ int rangeStart;
+ int rangeEnd;
+ private int numberOfRanges;
+
+ @Override
+ int getMaxNumberOfRanges() {
+ return 1;
+ }
+
+ @Override
+ void clear() {
+ numberOfRanges = 0;
+ requestedPages.clear();
+ }
+
+ @Override
+ void addRange(int start, int end) {
+ if (numberOfRanges++ == 0) {
+ rangeStart = start;
+ }
+ rangeEnd = end;
+ requestedPages.set(start, end + 1);
+ }
+
+ @Override
+ void pin(CloudMegaPageReadContext ctx, IBufferCache bufferCache, int
fileId, int pageZeroId)
+ throws HyracksDataException {
+ int numberOfPages = rangeEnd - rangeStart + 1;
+ ctx.pin(bufferCache, fileId, pageZeroId, rangeStart, numberOfPages,
requestedPages);
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
index 169c14da47..c5210d3459 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/test/java/org/apache/hyracks/storage/am/lsm/btree/column/cloud/buffercache/read/MergedPageRagesTest.java
@@ -18,40 +18,29 @@
*/
package org.apache.hyracks.storage.am.lsm.btree.column.cloud.buffercache.read;
-import java.util.List;
-
-import org.apache.hyracks.storage.am.lsm.btree.column.cloud.IntPairUtil;
import org.junit.Assert;
import org.junit.Test;
-import it.unimi.dsi.fastutil.Pair;
+import it.unimi.dsi.fastutil.ints.IntList;
public class MergedPageRagesTest {
- private MergedPageRanges mergedPageRanges;
- private final CloudMegaPageReadContext cloudMegaPageReadContext = null;
@Test
public void mergePageRanges1() {
int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
int requiredRangeCount = 3;
- mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext,
requiredRangeCount);
+ AbstractPageRangesComputer mergedPageRanges =
AbstractPageRangesComputer.create(requiredRangeCount);
for (int i = 0; i < pageRanges.length; i += 2) {
mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
}
- mergedPageRanges.mergeRanges();
// since the gaps are in following order
// ( 2, 1, 4, 4 )
// since we need 3 ranges, 5 - 3 = 2 merges should be done.
// hence the resultant ranges := ( 0, 2 ), ( 3, 3 ), ( 4, 4 )
- List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 2),
Pair.of(3, 3), Pair.of(4, 4));
-
- for (int i = 0; i < requiredRangeCount; i++) {
- long nextRange = mergedPageRanges.getNextRange();
- Assert.assertEquals(ranges.get(i).first().intValue(),
IntPairUtil.getFirst(nextRange));
- Assert.assertEquals(ranges.get(i).second().intValue(),
IntPairUtil.getSecond(nextRange));
- }
+ int[] expectedRanges = new int[] { 1, 12, 16, 19, 23, 26 };
+ assertResult(mergedPageRanges, expectedRanges);
}
@Test
@@ -59,23 +48,17 @@ public class MergedPageRagesTest {
int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
int requiredRangeCount = 1;
- mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext,
requiredRangeCount);
+ AbstractPageRangesComputer mergedPageRanges =
AbstractPageRangesComputer.create(requiredRangeCount);
for (int i = 0; i < pageRanges.length; i += 2) {
mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
}
- mergedPageRanges.mergeRanges();
// since the gaps are in following order
// ( 2, 1, 4, 4 )
// since we need 1 ranges, 5 - 4 = 1 merge should be done.
// hence the resultant ranges := ( 0, 4)
- List<Pair<Integer, Integer>> ranges = List.of(Pair.of(0, 4));
-
- for (int i = 0; i < requiredRangeCount; i++) {
- long nextRange = mergedPageRanges.getNextRange();
- Assert.assertEquals(ranges.get(i).first().intValue(),
IntPairUtil.getFirst(nextRange));
- Assert.assertEquals(ranges.get(i).second().intValue(),
IntPairUtil.getSecond(nextRange));
- }
+ int[] expectedRanges = new int[] { 1, 26 };
+ assertResult(mergedPageRanges, expectedRanges);
}
@Test
@@ -83,22 +66,92 @@ public class MergedPageRagesTest {
int[] pageRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
int requiredRangeCount = 8;
- mergedPageRanges = new MergedPageRanges(cloudMegaPageReadContext,
requiredRangeCount);
+ AbstractPageRangesComputer mergedPageRanges =
AbstractPageRangesComputer.create(requiredRangeCount);
for (int i = 0; i < pageRanges.length; i += 2) {
mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
}
- mergedPageRanges.mergeRanges();
// since the gaps are in following order
// ( 2, 1, 4, 4 )
// since we need 8 ranges, no merge should be done.
- List<Pair<Integer, Integer>> ranges =
- List.of(Pair.of(0, 0), Pair.of(1, 1), Pair.of(2, 2),
Pair.of(3, 3), Pair.of(4, 4));
+ int[] expectedRanges = new int[] { 1, 3, 5, 7, 8, 12, 16, 19, 23, 26 };
+
+ assertResult(mergedPageRanges, expectedRanges);
+ }
+
+ @Test
+ public void singlePageGap() {
+ int[] pageRanges = new int[] { 1, 3, 5, 7, 9, 10 };
+
+ int requiredRangeCount = 3;
+ AbstractPageRangesComputer mergedPageRanges =
AbstractPageRangesComputer.create(requiredRangeCount);
+ for (int i = 0; i < pageRanges.length; i += 2) {
+ mergedPageRanges.addRange(pageRanges[i], pageRanges[i + 1]);
+ }
+
+ // Since the gaps between the ranges are all 1, the result should be a
single range
+ int[] expectedRanges = new int[] { 1, 10 };
+
+ assertResult(mergedPageRanges, expectedRanges);
+ }
+
+ private void assertResult(AbstractPageRangesComputer mergedPageRanges,
int[] expectedRanges) {
+ if (mergedPageRanges.getMaxNumberOfRanges() == 1) {
+ assertSinglePageRangeComputer((SinglePageRangeComputer)
mergedPageRanges, expectedRanges);
+ return;
+ }
+ PageRangesComputer pageRangesComputer = (PageRangesComputer)
mergedPageRanges;
+ int mergeResult = pageRangesComputer.mergeRanges();
+ switch (mergeResult) {
+ case PageRangesComputer.SINGLE_RANGE:
+ assertSingleRange(pageRangesComputer, expectedRanges);
+ break;
+ case PageRangesComputer.EACH_RANGE:
+ assertEachRange(pageRangesComputer, expectedRanges);
+ break;
+ default:
+ assertBoundaries(pageRangesComputer, expectedRanges);
+ break;
+ }
+ }
+
+ private void assertSinglePageRangeComputer(SinglePageRangeComputer
mergedPageRanges, int[] expectedRanges) {
+ Assert.assertEquals(expectedRanges.length, 2);
+ Assert.assertEquals(expectedRanges[0], mergedPageRanges.rangeStart);
+ Assert.assertEquals(expectedRanges[1], mergedPageRanges.rangeEnd);
+ }
+
+ private void assertSingleRange(PageRangesComputer mergedPageRanges, int[]
expectedRanges) {
+ Assert.assertEquals(expectedRanges.length, 2);
+ IntList pageRanges = mergedPageRanges.pageRanges;
+ Assert.assertEquals(expectedRanges[0], pageRanges.getInt(0));
+ Assert.assertEquals(expectedRanges[1],
pageRanges.getInt(pageRanges.size() - 1));
+ }
+
+ private void assertEachRange(PageRangesComputer mergedPageRanges, int[]
expectedRanges) {
+ IntList pageRanges = mergedPageRanges.pageRanges;
+ Assert.assertEquals(expectedRanges.length, pageRanges.size());
+ for (int i = 0; i < expectedRanges.length; i++) {
+ Assert.assertEquals(expectedRanges[i], pageRanges.getInt(i));
+ }
+ }
- for (int i = 0; i < ranges.size(); i++) {
- long nextRange = mergedPageRanges.getNextRange();
- Assert.assertEquals(ranges.get(i).first().intValue(),
IntPairUtil.getFirst(nextRange));
- Assert.assertEquals(ranges.get(i).second().intValue(),
IntPairUtil.getSecond(nextRange));
+ private void assertBoundaries(PageRangesComputer mergedPageRanges, int[]
expectedRanges) {
+ int[] rangeBoundaries = mergedPageRanges.rangeBoundaries;
+ Assert.assertEquals(expectedRanges.length / 2, rangeBoundaries.length);
+ IntList pageRanges = mergedPageRanges.pageRanges;
+ int startIndex = 0;
+ int expectedRange = 0;
+ for (int i = 0; i < rangeBoundaries.length; i++) {
+ int endIndex = rangeBoundaries[i];
+ int rangeStart = pageRanges.getInt(startIndex);
+ int rangeEnd = pageRanges.getInt(endIndex);
+
+ Assert.assertEquals(expectedRanges[expectedRange++], rangeStart);
+ Assert.assertEquals(expectedRanges[expectedRange++], rangeEnd);
+
+ // Start from the next cut
+ startIndex = endIndex + 1;
}
}
}