This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8d88fba7 CASSSIDECAR-158: Adding APIs required for CDC (#147)
8d88fba7 is described below
commit 8d88fba753db9a464bb0b562b87bd0d5b6271c69
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Dec 2 15:28:18 2024 -0800
CASSSIDECAR-158: Adding APIs required for CDC (#147)
Patch by Jyothsna Konisa, Saranya Krishnakumar, Yifan Cai; Reviewed by
Bernardo Botella, Francisco Guerrero, James Berragan, Yifan Cai for
CASSSIDECAR-158
Co-authored-by: Jyothsna Konisa <[email protected]>
Co-authored-by: Saranya Krishnakumar <[email protected]>
Co-authored-by: Yifan Cai <[email protected]>
---
.../cassandra/sidecar/common/ApiEndpointsV1.java | 7 +
.../common/response/ListCdcSegmentsResponse.java | 68 +++++
.../common/response/data/CdcSegmentInfo.java | 48 ++++
.../cassandra/sidecar/common/utils/HttpRange.java | 6 +-
.../sidecar/common/utils/Preconditions.java | 31 +++
.../response/ListCdcSegmentsResponseTest.java | 56 ++++
.../sidecar/common/utils/HttpRangeTest.java | 4 +-
server/src/main/dist/conf/sidecar.yaml | 5 +
.../apache/cassandra/sidecar/cdc/CdcLogCache.java | 205 +++++++++++++++
.../sidecar/cluster/instance/InstanceMetadata.java | 5 +
.../cluster/instance/InstanceMetadataImpl.java | 21 ++
.../cassandra/sidecar/config/CdcConfiguration.java | 29 +++
.../sidecar/config/InstanceConfiguration.java | 5 +
.../sidecar/config/ServiceConfiguration.java | 5 +
.../sidecar/config/SidecarConfiguration.java | 1 +
.../sidecar/config/yaml/CdcConfigurationImpl.java | 50 ++++
.../config/yaml/InstanceConfigurationImpl.java | 16 ++
.../config/yaml/ServiceConfigurationImpl.java | 31 +++
.../sidecar/routes/cdc/ListCdcDirHandler.java | 149 +++++++++++
.../routes/cdc/StreamCdcSegmentHandler.java | 196 ++++++++++++++
.../cassandra/sidecar/server/MainModule.java | 11 +
.../apache/cassandra/sidecar/utils/CdcUtil.java | 144 +++++++++++
.../cassandra/sidecar/utils/FileStreamer.java | 5 +-
.../org/apache/cassandra/sidecar/TestModule.java | 10 +-
.../cassandra/sidecar/cdc/CDCLogCacheTest.java | 143 +++++++++++
.../sidecar/config/SidecarConfigurationTest.java | 9 +
.../sidecar/routes/cdc/ListCdcDirHandlerTest.java | 132 ++++++++++
.../routes/cdc/StreamCdcSegmentHandlerTest.java | 286 +++++++++++++++++++++
.../restore/RestoreJobProgressHandlerTest.java | 1 -
server/src/test/resources/config/sidecar_cdc.yaml | 73 ++++++
.../resources/instance1/cdc_raw/CommitLog-1-1.log | 1 +
.../instance1/cdc_raw/CommitLog-1-1_cdc.idx | 2 +
.../resources/instance1/cdc_raw/CommitLog-1-2.log | 1 +
.../instance1/cdc_raw/CommitLog-1-2_cdc.idx | 1 +
.../resources/instance1/cdc_raw/CommitLog-1-3.log | 1 +
35 files changed, 1750 insertions(+), 8 deletions(-)
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index ec4c7b7e..be49b511 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -109,6 +109,13 @@ public final class ApiEndpointsV1
public static final String ABORT_RESTORE_JOB_ROUTE = RESTORE_JOB_ROUTE +
ABORT;
public static final String RESTORE_JOB_PROGRESS_ROUTE = RESTORE_JOB_ROUTE
+ PROGRESS;
+ // CDC APIs
+ public static final String CDC_PATH = "/cdc";
+ public static final String SEGMENT_PATH_PARAM = ":segment";
+ public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH +
"/segments";
+ public static final String STREAM_CDC_SEGMENTS_ROUTE =
LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM;
+
+
public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 +
CASSANDRA + "/stats/connected-clients";
private ApiEndpointsV1()
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
new file mode 100644
index 00000000..8b72cb2b
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+
+/**
+ * A class representing a response for a list CDC Segment request
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ListCdcSegmentsResponse
+{
+ private final String host;
+ private final int port;
+ private final List<CdcSegmentInfo> segmentInfos;
+
+ @JsonCreator
+ public ListCdcSegmentsResponse(@JsonProperty("host") String host,
+ @JsonProperty("port") int port,
+ @JsonProperty("segmentInfos")
List<CdcSegmentInfo> segmentsInfo)
+ {
+ this.host = host;
+ this.port = port;
+ this.segmentInfos = Collections.unmodifiableList(segmentsInfo);
+ }
+
+ @JsonProperty("host")
+ public String host()
+ {
+ return host;
+ }
+
+ @JsonProperty("port")
+ public int port()
+ {
+ return port;
+ }
+
+ @JsonProperty("segmentInfos")
+ public List<CdcSegmentInfo> segmentInfos()
+ {
+ return segmentInfos;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
new file mode 100644
index 00000000..1745fa9d
--- /dev/null
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Class representing segment information.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CdcSegmentInfo
+{
+ public final String name;
+ public final long size;
+ public final long idx;
+ public final boolean completed;
+ public final long lastModifiedTimestamp;
+
+ public CdcSegmentInfo(@JsonProperty("name") String name,
@JsonProperty("size") long size,
+ @JsonProperty("idx") long idx,
@JsonProperty("completed") boolean completed,
+ @JsonProperty("lastModifiedTimestamp") long
lastModifiedTimestamp)
+ {
+ this.name = name;
+ this.size = size;
+ this.idx = idx;
+ this.completed = completed;
+ this.lastModifiedTimestamp = lastModifiedTimestamp;
+ }
+}
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
index a4039da4..9f155aa8 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
@@ -44,8 +44,10 @@ public class HttpRange
// An initialized range is always valid; invalid params fail range
initialization.
private HttpRange(final long start, final long end)
{
- Preconditions.checkArgument(start >= 0, "Range start can not be
negative");
- Preconditions.checkArgument(end >= start, "Range does not satisfy
boundary requirements");
+ Preconditions.checkArgument(start >= 0,
+ () -> String.format("Range start can not
be negative. range=[%s, %s]", start, end));
+ Preconditions.checkArgument(end >= start,
+ () -> String.format("Range does not
satisfy boundary requirements. range=[%s, %s]", start, end));
this.start = start;
this.end = end;
long len = end - start + 1; // Assign long max if overflows
diff --git
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
index f6445b01..ca845d30 100644
---
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
+++
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.sidecar.common.utils;
+import java.util.function.Supplier;
+
/**
* A class to prevent introducing a dependency to guava in common and client
*/
@@ -39,6 +41,21 @@ public class Preconditions
}
}
+ /**
+ * Similar to {@link #checkArgument(boolean, String)}, but allows to
evaluate the error message lazily
+ *
+ * @param validCondition the condition to evaluate
+ * @param errorMessageSupplier supplies the error message to use for the
{@link IllegalArgumentException}
+ * @throws IllegalArgumentException when the condition is not valid (i.e.
{@code false})
+ */
+ public static void checkArgument(boolean validCondition, Supplier<String>
errorMessageSupplier)
+ {
+ if (!validCondition)
+ {
+ throw new IllegalArgumentException(errorMessageSupplier.get());
+ }
+ }
+
/**
* Throws an {@link IllegalStateException} when the {@code validCondition}
is {@code false}, otherwise
* no action is taken.
@@ -53,4 +70,18 @@ public class Preconditions
throw new IllegalStateException(errorMessage);
}
}
+
+ /**
+ * Similar to {@link #checkState(boolean, String)}, but allows to evaluate
the error message lazily
+ *
+ * @param validCondition the condition to evaluate
+ * @param errorMessageSupplier supplies the error message to use for the
{@link IllegalStateException}
+ */
+ public static void checkState(boolean validCondition, Supplier<String>
errorMessageSupplier)
+ {
+ if (!validCondition)
+ {
+ throw new IllegalStateException(errorMessageSupplier.get());
+ }
+ }
}
diff --git
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponseTest.java
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponseTest.java
new file mode 100644
index 00000000..d057c55d
--- /dev/null
+++
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponseTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test {@link ListCdcSegmentsResponse}
+ */
+class ListCdcSegmentsResponseTest
+{
+ @Test
+ void testSerDeser() throws Exception
+ {
+ List<CdcSegmentInfo> segments = Arrays.asList(new
CdcSegmentInfo("commit-log1", 100, 100, true, 1732148713725L),
+ new
CdcSegmentInfo("commit-log2", 100, 10, false, 1732148713725L));
+ ListCdcSegmentsResponse response = new
ListCdcSegmentsResponse("localhost", 9043, segments);
+ ObjectMapper mapper = new ObjectMapper();
+ String json = mapper.writeValueAsString(response);
+ assertThat(json).isEqualTo("{\"host\":\"localhost\"," +
+ "\"port\":9043," +
+ "\"segmentInfos\":[" +
+
"{\"name\":\"commit-log1\",\"size\":100,\"idx\":100,\"completed\":true,\"lastModifiedTimestamp\":1732148713725},"
+
+
"{\"name\":\"commit-log2\",\"size\":100,\"idx\":10,\"completed\":false,\"lastModifiedTimestamp\":1732148713725}]}");
+ ListCdcSegmentsResponse deserialized = mapper.readValue(json,
ListCdcSegmentsResponse.class);
+ assertThat(deserialized.host()).isEqualTo("localhost");
+ assertThat(deserialized.port()).isEqualTo(9043);
+ assertThat(deserialized.segmentInfos()).hasSize(2);
+
assertThat(deserialized.segmentInfos().get(0).name).isEqualTo("commit-log1");
+
assertThat(deserialized.segmentInfos().get(1).name).isEqualTo("commit-log2");
+ }
+}
diff --git
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
index efc96130..4ad8bdba 100644
---
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
+++
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
@@ -96,7 +96,7 @@ public class HttpRangeTest
{
HttpRange.parseHeader(rangeHeader, Long.MAX_VALUE);
});
- String msg = "Range does not satisfy boundary requirements";
+ String msg = "Range does not satisfy boundary requirements.
range=[9223372036854775807, 9223372036854775806]";
assertEquals(msg, thrownException.getMessage());
}
@@ -108,7 +108,7 @@ public class HttpRangeTest
{
HttpRange.parseHeader(rangeHeader, Long.MAX_VALUE);
});
- String msg = "Range does not satisfy boundary requirements";
+ String msg = "Range does not satisfy boundary requirements. range=[9,
2]";
assertEquals(msg, thrownException.getMessage());
}
diff --git a/server/src/main/dist/conf/sidecar.yaml
b/server/src/main/dist/conf/sidecar.yaml
index 8fe0d742..1588bcf2 100644
--- a/server/src/main/dist/conf/sidecar.yaml
+++ b/server/src/main/dist/conf/sidecar.yaml
@@ -26,6 +26,7 @@ cassandra_instances:
data_dirs:
- ~/.ccm/test/node1/data0
staging_dir: ~/.ccm/test/node1/sstable-staging
+ cdc_dir: ~/.ccm/test/node1/cdc_raw
jmx_host: 127.0.0.1
jmx_port: 7100
jmx_ssl_enabled: false
@@ -37,6 +38,7 @@ cassandra_instances:
data_dirs:
- ~/.ccm/test/node2/data0
staging_dir: ~/.ccm/test/node2/sstable-staging
+ cdc_dir: ~/.ccm/test/node2/cdc_raw
jmx_host: 127.0.0.1
jmx_port: 7200
jmx_ssl_enabled: false
@@ -48,6 +50,7 @@ cassandra_instances:
data_dirs:
- ~/.ccm/test/node3/data0
staging_dir: ~/.ccm/test/node3/sstable-staging
+ cdc_dir: ~/.ccm/test/node3/cdc_raw
jmx_host: 127.0.0.1
jmx_port: 7300
jmx_ssl_enabled: false
@@ -86,6 +89,8 @@ sidecar:
snapshot_list_cache:
expire_after_access_millis: 7200000 # 2 hours
maximum_size: 10000
+ cdc:
+ segment_hardlink_cache_expiry_in_secs: 300 # 5 mins
worker_pools:
service:
name: "sidecar-worker-pool"
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcLogCache.java
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcLogCache.java
new file mode 100644
index 00000000..eb1fb894
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcLogCache.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+
+/**
+ * CDCLogCache caches the recently downloaded files to avoid being deleted by
accident.
+ * <br>
+ * Downloads tracking is via {@linkplain #touch}.
+ * <br>
+ * In the event of deleting the _consumed_ files, 1 supersedes 2, meaning the
_consumed_ files and their links
+ * are deleted, even though within the cache duration.
+ */
+@Singleton
+public class CdcLogCache
+{
+ public static final String TEMP_DIR_SUFFIX = "_tmp";
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CdcLogCache.class);
+ private static final RemovalListener<File, File> hardlinkRemover =
notification -> deleteFileIfExist(notification.getValue());
+
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final TaskExecutorPool internalExecutorPool;
+ private final long cacheExpiryInMillis;
+
+ // Cache for the hardlinks. Key: origin file; Value: link file
+ // The entries expire after 5 minutes
+ @VisibleForTesting
+ final Cache<File, File> hardlinkCache;
+
+ @Inject
+ public CdcLogCache(ExecutorPools executorPools,
+ InstancesConfig instancesConfig,
+ SidecarConfiguration sidecarConfig)
+ {
+ this(executorPools, instancesConfig,
+ TimeUnit.SECONDS.toMillis(sidecarConfig.serviceConfiguration()
+ .cdcConfiguration()
+
.segmentHardlinkCacheExpiryInSecs()));
+ }
+
+ @VisibleForTesting
+ CdcLogCache(ExecutorPools executorPools,
+ InstancesConfig instancesConfig,
+ long cacheExpiryInMillis)
+ {
+ this.cacheExpiryInMillis = cacheExpiryInMillis;
+ this.internalExecutorPool = executorPools.internal();
+ this.hardlinkCache = CacheBuilder.newBuilder()
+
.expireAfterAccess(cacheExpiryInMillis, TimeUnit.MILLISECONDS)
+ .removalListener(hardlinkRemover)
+ .build();
+ // Run cleanup in the internal pool to mute any exceptions. The
cleanup is best-effort.
+ internalExecutorPool.runBlocking(() ->
cleanupLinkedFilesOnStartup(instancesConfig));
+ }
+
+ public void initMaybe()
+ {
+ if (isInitialized.get())
+ {
+ return;
+ }
+
+ if (isInitialized.compareAndSet(false, true))
+ {
+ // setup periodic and serial cleanup
+ internalExecutorPool.setPeriodic(cacheExpiryInMillis,
+ id -> hardlinkCache.cleanUp(),
+ true);
+ }
+ }
+
+ public void touch(File segmentFile, File indexFile)
+ {
+ // renew the hardlinks
+ hardlinkCache.getIfPresent(segmentFile);
+ hardlinkCache.getIfPresent(indexFile);
+ }
+
+ /**
+ * Create a hardlink from origin in the cache if the hardlink does not
exist yet.
+ *
+ * @param origin the source file
+ * @return the link file
+ * @throws IOException when an IO exception occurs during link
+ */
+ public File createLinkedFileInCache(File origin) throws IOException
+ {
+ File link = hardlinkCache.getIfPresent(origin);
+ if (link == null)
+ {
+ link = new File(getTempCdcDir(origin.getParent()),
origin.getName());
+ try
+ {
+ // create link and cache it
+ Files.createLink(link.toPath(), origin.toPath());
+ hardlinkCache.put(origin, link);
+ }
+ catch (FileAlreadyExistsException e)
+ {
+ LOGGER.debug("The target of hardlink {} already exists. It
could be created by a concurrent request.", link);
+ }
+ }
+ return link;
+ }
+
+ /**
+ * Clean up the linked file when the application is starting.
+ * There could be files left over if the application crashes during
streaming the CDC segments.
+ * On a new start, the tmp directory for the linked CDC segments should be
empty.
+ * It is only called in the constructor of the handler singleton.
+ *
+ * @param config instances config
+ */
+ @VisibleForTesting
+ public void cleanupLinkedFilesOnStartup(InstancesConfig config)
+ {
+ for (InstanceMetadata instance : config.instances())
+ {
+ try
+ {
+ cleanupLinkedFiles(instance);
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Failed to clean up linked files for instance {}",
instance.id(), e);
+ }
+ }
+ }
+
+ private void cleanupLinkedFiles(InstanceMetadata instance) throws
IOException
+ {
+ File[] files = getTempCdcDir((instance.cdcDir()))
+ .listFiles(f -> CdcUtil.isLogFile(f.getName()) ||
CdcUtil.isIndexFile(f.getName()));
+ if (files == null)
+ return;
+ for (File f : files)
+ {
+ deleteFileIfExist(f);
+ }
+ }
+
+ private static void deleteFileIfExist(File file)
+ {
+ if (file.exists() && file.isFile())
+ {
+ if (file.delete())
+ {
+ LOGGER.debug("Removed the link file={}", file);
+ }
+ }
+ }
+
+ private static File getTempCdcDir(String cdcDir) throws IOException
+ {
+ File dir = new File(cdcDir + TEMP_DIR_SUFFIX);
+ try
+ {
+ Files.createDirectories(dir.toPath());
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Unable to create temporary CDC directory {}", dir,
e);
+ throw e;
+ }
+ return dir;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
index b3860e13..5d5ff5c0 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
@@ -56,6 +56,11 @@ public interface InstanceMetadata
*/
String stagingDir();
+ /**
+ * @return cdc directory of the cassandra instance
+ */
+ String cdcDir();
+
/**
* @return a {@link CassandraAdapterDelegate} specific for the instance
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 21cac7d9..c3cded7e 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -42,6 +42,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
private final int port;
private final List<String> dataDirs;
private final String stagingDir;
+ private final String cdcDir;
@Nullable
private final CassandraAdapterDelegate delegate;
private final InstanceMetrics metrics;
@@ -55,6 +56,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
.map(FileUtils::maybeResolveHomeDirectory)
.collect(Collectors.collectingAndThen(Collectors.toList(),
Collections::unmodifiableList));
stagingDir = FileUtils.maybeResolveHomeDirectory(builder.stagingDir);
+ cdcDir = builder().cdcDir;
delegate = builder.delegate;
metrics = builder.metrics;
}
@@ -89,6 +91,12 @@ public class InstanceMetadataImpl implements InstanceMetadata
return stagingDir;
}
+ @Override
+ public String cdcDir()
+ {
+ return cdcDir;
+ }
+
@Override
public @Nullable CassandraAdapterDelegate delegate()
{
@@ -116,6 +124,7 @@ public class InstanceMetadataImpl implements
InstanceMetadata
protected int port;
protected List<String> dataDirs;
protected String stagingDir;
+ protected String cdcDir;
protected CassandraAdapterDelegate delegate;
protected MetricRegistry metricRegistry;
protected InstanceMetrics metrics;
@@ -131,6 +140,7 @@ public class InstanceMetadataImpl implements
InstanceMetadata
port = instanceMetadata.port;
dataDirs = new ArrayList<>(instanceMetadata.dataDirs);
stagingDir = instanceMetadata.stagingDir;
+ cdcDir = instanceMetadata.cdcDir;
delegate = instanceMetadata.delegate;
metrics = instanceMetadata.metrics;
}
@@ -196,6 +206,17 @@ public class InstanceMetadataImpl implements
InstanceMetadata
return update(b -> b.stagingDir = stagingDir);
}
+ /**
+ * Sets the {@code cdcDir} and returns a reference to this Builder
enabling method chaining.
+ *
+ * @param cdcDir the {@code cdcDir} to set
+ * @return a reference to this Builder
+ */
+ public Builder cdcDir(String cdcDir)
+ {
+ return update(b -> b.cdcDir = cdcDir);
+ }
+
/**
* Sets the {@code delegate} and returns a reference to this Builder
enabling method chaining.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
new file mode 100644
index 00000000..517c6b40
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.config;
+
+/**
+ * This class encapsulates configuration values for cdc.
+ */
+public interface CdcConfiguration
+{
+ /**
+ * @return segment hard link cache expiration time in seconds used in
{@link org.apache.cassandra.sidecar.cdc.CdcLogCache}
+ */
+ long segmentHardlinkCacheExpiryInSecs();
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
index 12108a55..ca4e2892 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
@@ -50,6 +50,11 @@ public interface InstanceConfiguration
*/
String stagingDir();
+ /**
+ * @return cdc directory of the cassandra instance
+ */
+ String cdcDir();
+
/**
* @return the host address of the JMX service for the Cassandra instance
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
index b47caf3a..a5150f79 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
@@ -144,4 +144,9 @@ public interface ServiceConfiguration
* @return the configuration for sidecar schema
*/
SchemaKeyspaceConfiguration schemaKeyspaceConfiguration();
+
+ /**
+ * @return the configuration for cdc
+ */
+ CdcConfiguration cdcConfiguration();
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 7babd0e7..736415f6 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -87,4 +87,5 @@ public interface SidecarConfiguration
* @return the configuration for vert.x
*/
@Nullable VertxConfiguration vertxConfiguration();
+
}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
new file mode 100644
index 00000000..eaccd1e7
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.config.yaml;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
+
+/**
+ * Encapsulate configuration values for CDC
+ */
+public class CdcConfigurationImpl implements CdcConfiguration
+{
+ public static final String SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS_PROPERTY
= "segment_hardlink_cache_expiry_in_secs";
+ public static final long DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS =
300;
+
+ @JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS_PROPERTY,
defaultValue = DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS + "")
+ protected final long segmentHardLinkCacheExpiryInSecs;
+
+ public CdcConfigurationImpl()
+ {
+ this.segmentHardLinkCacheExpiryInSecs =
DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS;
+ }
+
+ public CdcConfigurationImpl(long segmentHardLinkCacheExpiryInSecs)
+ {
+ this.segmentHardLinkCacheExpiryInSecs =
segmentHardLinkCacheExpiryInSecs;
+ }
+
+ @Override
+ @JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS_PROPERTY)
+ public long segmentHardlinkCacheExpiryInSecs()
+ {
+ return segmentHardLinkCacheExpiryInSecs;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
index 891ae0b7..6d544b92 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
@@ -44,6 +44,9 @@ public class InstanceConfigurationImpl implements
InstanceConfiguration
@JsonProperty("staging_dir")
protected final String stagingDir;
+ @JsonProperty("cdc_dir")
+ protected final String cdcDir;
+
@JsonProperty("jmx_host")
protected final String jmxHost;
@@ -66,6 +69,7 @@ public class InstanceConfigurationImpl implements
InstanceConfiguration
this.port = 9042;
this.dataDirs = null;
this.stagingDir = null;
+ this.cdcDir = null;
this.jmxHost = null;
this.jmxPort = 0;
this.jmxSslEnabled = false;
@@ -78,6 +82,7 @@ public class InstanceConfigurationImpl implements
InstanceConfiguration
int port,
List<String> dataDirs,
String stagingDir,
+ String cdcDir,
String jmxHost,
int jmxPort,
boolean jmxSslEnabled,
@@ -89,6 +94,7 @@ public class InstanceConfigurationImpl implements
InstanceConfiguration
this.port = port;
this.dataDirs = Collections.unmodifiableList(dataDirs);
this.stagingDir = stagingDir;
+ this.cdcDir = cdcDir;
this.jmxHost = jmxHost;
this.jmxPort = jmxPort;
this.jmxSslEnabled = jmxSslEnabled;
@@ -146,6 +152,16 @@ public class InstanceConfigurationImpl implements
InstanceConfiguration
return stagingDir;
}
+ /**
+ * @return cdc directory of the cassandra instance
+ */
+ @Override
+ @JsonProperty("cdc_dir")
+ public String cdcDir()
+ {
+ return cdcDir;
+ }
+
/**
* @return the host address of the JMX service for the Cassandra instance
*/
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
index 36611a34..8f2c9879 100644
---
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
+++
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
import org.apache.cassandra.sidecar.config.JmxConfiguration;
import org.apache.cassandra.sidecar.config.SSTableImportConfiguration;
import org.apache.cassandra.sidecar.config.SSTableSnapshotConfiguration;
@@ -34,6 +35,7 @@ import
org.apache.cassandra.sidecar.config.ServiceConfiguration;
import org.apache.cassandra.sidecar.config.ThrottleConfiguration;
import org.apache.cassandra.sidecar.config.TrafficShapingConfiguration;
import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration;
+import org.jetbrains.annotations.Nullable;
/**
* Configuration for the Sidecar Service and configuration of the REST
endpoints in the service
@@ -64,6 +66,7 @@ public class ServiceConfigurationImpl implements
ServiceConfiguration
private static final String JMX_PROPERTY = "jmx";
private static final String TRAFFIC_SHAPING_PROPERTY = "traffic_shaping";
private static final String SCHEMA = "schema";
+ private static final String CDC = "cdc";
protected static final Map<String, WorkerPoolConfiguration>
DEFAULT_WORKER_POOLS_CONFIGURATION
= Collections.unmodifiableMap(new HashMap<String,
WorkerPoolConfiguration>()
{{
@@ -123,6 +126,9 @@ public class ServiceConfigurationImpl implements
ServiceConfiguration
@JsonProperty(value = SCHEMA)
protected final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration;
+ @JsonProperty(value = CDC)
+ protected final CdcConfiguration cdcConfiguration;
+
/**
* Constructs a new {@link ServiceConfigurationImpl} with the default
values
*/
@@ -154,6 +160,7 @@ public class ServiceConfigurationImpl implements
ServiceConfiguration
jmxConfiguration = builder.jmxConfiguration;
trafficShapingConfiguration = builder.trafficShapingConfiguration;
schemaKeyspaceConfiguration = builder.schemaKeyspaceConfiguration;
+ cdcConfiguration = builder.cdcConfiguration;
}
/**
@@ -316,6 +323,17 @@ public class ServiceConfigurationImpl implements
ServiceConfiguration
return schemaKeyspaceConfiguration;
}
+ /**
+ * @return the configuration for cdc
+ */
+ @Override
+ @JsonProperty(value = CDC)
+ @Nullable
+ public CdcConfiguration cdcConfiguration()
+ {
+ return cdcConfiguration;
+ }
+
public static Builder builder()
{
return new Builder();
@@ -343,6 +361,7 @@ public class ServiceConfigurationImpl implements
ServiceConfiguration
protected JmxConfiguration jmxConfiguration = new
JmxConfigurationImpl();
protected TrafficShapingConfiguration trafficShapingConfiguration =
new TrafficShapingConfigurationImpl();
protected SchemaKeyspaceConfiguration schemaKeyspaceConfiguration =
new SchemaKeyspaceConfigurationImpl();
+ protected CdcConfiguration cdcConfiguration = new
CdcConfigurationImpl();
private Builder()
{
@@ -533,6 +552,18 @@ public class ServiceConfigurationImpl implements
ServiceConfiguration
return update(b -> b.schemaKeyspaceConfiguration =
schemaKeyspaceConfiguration);
}
+ /**
+ * Set the {@code cdcConfiguration} and returns a reference to this
Builder enabling
+ * method chaining.
+ *
+ * @param configuration th {@code cdcConfiguration} to set
+ * @return a reference to the Builder
+ */
+ public Builder cdcConfiguration(CdcConfiguration configuration)
+ {
+ return update(b -> b.cdcConfiguration = configuration);
+ }
+
/**
* Returns a {@code ServiceConfigurationImpl} built from the
parameters previously set.
*
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandler.java
new file mode 100644
index 00000000..2f7a2540
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandler.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CdcUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.getLogFilePrefix;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isIndexFile;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for listing commit logs in CDC directory.
+ */
+@Singleton
+public class ListCdcDirHandler extends AbstractHandler<Void>
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ListCdcDirHandler.class);
+ private final ServiceConfiguration config;
+ private final TaskExecutorPool serviceExecutorPool;
+
+ @Inject
+ public ListCdcDirHandler(InstanceMetadataFetcher metadataFetcher,
+ SidecarConfiguration config,
+ ExecutorPools executorPools,
+ CassandraInputValidator validator)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.config = config.serviceConfiguration();
+ this.serviceExecutorPool = executorPools.service();
+ }
+
+ @Override
+ protected void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ String host,
+ SocketAddress remoteAddress,
+ Void request)
+ {
+ String cdcDir = metadataFetcher.instance(host).cdcDir();
+ serviceExecutorPool
+ .executeBlocking(() -> collectCdcSegmentsFromFileSystem(cdcDir))
+ .map(segments -> new ListCdcSegmentsResponse(config.host(),
config.port(), segments))
+ .onSuccess(context::json)
+ .onFailure(cause -> {
+ LOGGER.warn("Error listing the CDC commit log segments", cause);
+ context.response()
+
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+ .setStatusMessage(cause.getMessage())
+ .end();
+ });
+ }
+
+ private List<CdcSegmentInfo> collectCdcSegmentsFromFileSystem(String
cdcDirPath) throws IOException
+ {
+ List<CdcSegmentInfo> segmentInfos = new ArrayList<>();
+ File cdcDir = Paths.get(cdcDirPath).toAbsolutePath().toFile();
+ if (!cdcDir.isDirectory())
+ {
+ throw new IOException("CDC directory does not exist");
+ }
+
+ File[] cdcFiles = cdcDir.listFiles();
+ if (cdcFiles == null || cdcFiles.length == 0)
+ {
+ return segmentInfos;
+ }
+
+ Set<String> idxFileNamePrefixes = new HashSet<>();
+ for (File cdcFile : cdcFiles)
+ {
+ if (CdcUtil.matchIndexExtension(cdcFile.getName()))
+ {
+
idxFileNamePrefixes.add(CdcUtil.getIdxFilePrefix(cdcFile.getName()));
+ }
+ }
+
+ for (File cdcFile : cdcFiles)
+ {
+ String fileName = cdcFile.getName();
+ BasicFileAttributes fileAttributes =
Files.readAttributes(cdcFile.toPath(), BasicFileAttributes.class);
+ if (!cdcFile.exists()
+ || !fileAttributes.isRegularFile() // the file just gets
deleted? ignore it
+ || isIndexFile(fileName) // ignore all .idx files
+ || !idxFileNamePrefixes.contains(getLogFilePrefix(fileName)))
// ignore .log files found without matching .idx files
+ {
+ continue;
+ }
+
+ CdcUtil.CdcIndex cdcIndex = parseIndexFile(new File(cdcDirPath,
getIdxFileName(fileName)), fileAttributes.size());
+ CdcSegmentInfo segmentInfo =
+ new CdcSegmentInfo(fileName, fileAttributes.size(),
+ cdcIndex.latestFlushPosition,
cdcIndex.isCompleted,
+ fileAttributes.lastModifiedTime().toMillis());
+ segmentInfos.add(segmentInfo);
+ }
+ return segmentInfos;
+ }
+
+ @Override
+ protected Void extractParamsOrThrow(RoutingContext context)
+ {
+ return null;
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandler.java
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandler.java
new file mode 100644
index 00000000..0c184e77
--- /dev/null
+++
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandler.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.cdc.CdcLogCache;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CdcUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isLogFile;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isValid;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.parseIndexFile;
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Provides REST endpoint for streaming cdc commit logs.
+ */
+@Singleton
+public class StreamCdcSegmentHandler extends AbstractHandler<String>
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(StreamCdcSegmentHandler.class);
+
+ private final FileStreamer fileStreamer;
+ private final CdcLogCache cdcLogCache;
+ private final TaskExecutorPool serviceExecutorPool;
+
+ @Inject
+ public StreamCdcSegmentHandler(InstanceMetadataFetcher metadataFetcher,
+ FileStreamer fileStreamer,
+ CdcLogCache cdcLogCache,
+ ExecutorPools executorPools,
+ CassandraInputValidator validator)
+ {
+ super(metadataFetcher, executorPools, validator);
+ this.fileStreamer = fileStreamer;
+ this.cdcLogCache = cdcLogCache;
+ this.serviceExecutorPool = executorPools.service();
+ }
+
+ @Override
+ protected void handleInternal(RoutingContext context,
+ HttpServerRequest httpRequest,
+ String host,
+ SocketAddress remoteAddress,
+ String segment)
+ {
+ cdcLogCache.initMaybe();
+ InstanceMetadata instance = metadataFetcher.instance(host);
+ String cdcDir = instance.cdcDir();
+ File segmentFile = new File(cdcDir, segment);
+ validateCdcSegmentFile(segmentFile);
+
+ String indexFileName = getIdxFileName(segment);
+ File indexFile = new File(cdcDir, indexFileName);
+ HttpResponse response = new HttpResponse(context.request(),
context.response());
+ streamCdcSegmentAsync(context, segmentFile, indexFile, response,
instance)
+ // Touch the files at the end of the request
+ // If the file exists in cache, its expiry is extended; otherwise, the
cache is not changed.
+ .onSuccess(res -> cdcLogCache.touch(segmentFile, indexFile))
+ .onFailure(cause -> processFailure(cause, context, host,
remoteAddress, segment));
+ }
+
+ private Future<Void> streamCdcSegmentAsync(RoutingContext context,
+ File segmentFile,
+ File indexFile,
+ HttpResponse response,
+ InstanceMetadata instance)
+ {
+ long segmentFileLength = segmentFile.length();
+ return getOrCreateLinkedCdcFilePairAsync(segmentFile, indexFile)
+ .compose(cdcFilePair ->
+ openCdcIndexFileAsync(cdcFilePair)
+ .compose(cdcIndex -> {
+ // stream the segment file; depending on whether
the cdc segment is complete or not, cap the range of file to stream
+ String rangeHeader =
context.request().getHeader(HttpHeaderNames.RANGE);
+ return cdcIndex.isCompleted
+ ?
fileStreamer.parseRangeHeader(rangeHeader, segmentFileLength)
+ :
fileStreamer.parseRangeHeader(rangeHeader, cdcIndex.latestFlushPosition);
+ })
+ .compose(range -> fileStreamer.stream(response,
instance.id(), cdcFilePair.segmentFile.getAbsolutePath(), segmentFileLength,
range))
+ );
+ }
+
+ @Override
+ protected String extractParamsOrThrow(RoutingContext context)
+ {
+ return context.request().getParam("segment");
+ }
+
+ private void validateCdcSegmentFile(File segmentFile) throws HttpException
+ {
+ // We only accept stream request for log files
+ if (!isValid(segmentFile.getName()) ||
!isLogFile(segmentFile.getName()))
+ {
+ throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid
path param for CDC segment: " + segmentFile.getName());
+ }
+
+ // check file existence
+ if (!segmentFile.exists())
+ {
+ throw wrapHttpException(HttpResponseStatus.NOT_FOUND, "CDC segment
not found: " + segmentFile.getName());
+ }
+
+ // check file content
+ long fileSize = segmentFile.length();
+ if (fileSize == 0)
+ {
+ throw
wrapHttpException(HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE, "File is
empty");
+ }
+ }
+
+ private Future<CdcFilePair> getOrCreateLinkedCdcFilePairAsync(File
segmentFile, File indexFile)
+ {
+ return serviceExecutorPool.executeBlocking(() -> {
+ // hardlink the segment and its index file,
+ // in order to guarantee the files exist in the subsequent calls
(within the cache duration)
+ try
+ {
+ File linkedSegmentFile =
cdcLogCache.createLinkedFileInCache(segmentFile);
+ File linkedIndexFile =
cdcLogCache.createLinkedFileInCache(indexFile);
+ return new CdcFilePair(linkedSegmentFile, linkedIndexFile);
+ }
+ catch (IOException e)
+ {
+ LOGGER.debug("Failed to prepare CDC segment to stream", e);
+ throw
wrapHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to prepare
CDC segment to stream", e);
+ }
+ });
+ }
+
+ private Future<CdcUtil.CdcIndex> openCdcIndexFileAsync(CdcFilePair
cdcFilePair)
+ {
+ return serviceExecutorPool.executeBlocking(() -> {
+ // check index file is accessible
+ try
+ {
+ return parseIndexFile(cdcFilePair.indexFile,
cdcFilePair.segmentFile.length());
+ }
+ catch (IOException e)
+ {
+ LOGGER.error("Failed to read CDC index file", e);
+ throw
wrapHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to read CDC
index file", e);
+ }
+ });
+ }
+
+ private static class CdcFilePair
+ {
+ private final File segmentFile;
+ private final File indexFile;
+
+ private CdcFilePair(File segmentFile, File indexFile)
+ {
+ this.segmentFile = segmentFile;
+ this.indexFile = indexFile;
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 93b9f810..e135e6a1 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -106,6 +106,8 @@ import
org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler;
import org.apache.cassandra.sidecar.routes.TimeSkewHandler;
import org.apache.cassandra.sidecar.routes.TokenRangeReplicaMapHandler;
import org.apache.cassandra.sidecar.routes.cassandra.NodeSettingsHandler;
+import org.apache.cassandra.sidecar.routes.cdc.ListCdcDirHandler;
+import org.apache.cassandra.sidecar.routes.cdc.StreamCdcSegmentHandler;
import org.apache.cassandra.sidecar.routes.restore.AbortRestoreJobHandler;
import org.apache.cassandra.sidecar.routes.restore.CreateRestoreJobHandler;
import org.apache.cassandra.sidecar.routes.restore.CreateRestoreSliceHandler;
@@ -255,6 +257,8 @@ public class MainModule extends AbstractModule
SSTableUploadHandler ssTableUploadHandler,
SSTableImportHandler ssTableImportHandler,
SSTableCleanupHandler ssTableCleanupHandler,
+ StreamCdcSegmentHandler streamCdcSegmentHandler,
+ ListCdcDirHandler listCdcDirHandler,
RestoreRequestValidationHandler
validateRestoreJobRequest,
DiskSpaceProtectionHandler diskSpaceProtection,
ValidateTableExistenceHandler
validateTableExistence,
@@ -417,6 +421,12 @@ public class MainModule extends AbstractModule
.handler(validateRestoreJobRequest)
.handler(restoreJobProgressHandler);
+ // CDC APIs
+ router.get(ApiEndpointsV1.LIST_CDC_SEGMENTS_ROUTE)
+ .handler(listCdcDirHandler);
+ router.get(ApiEndpointsV1.STREAM_CDC_SEGMENTS_ROUTE)
+ .handler(streamCdcSegmentHandler);
+
return router;
}
@@ -700,6 +710,7 @@ public class MainModule extends AbstractModule
.port(port)
.dataDirs(cassandraInstance.dataDirs())
.stagingDir(cassandraInstance.stagingDir())
+ .cdcDir(cassandraInstance.cdcDir())
.delegate(delegate)
.metricRegistry(instanceSpecificRegistry)
.build();
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
new file mode 100644
index 00000000..91aa3029
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Class with utility methods for CDC.
+ */
+public final class CdcUtil
+{
+ private static final String SEPARATOR = "-";
+ private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
+ private static final String LOG_FILE_EXTENSION = ".log";
+ private static final String IDX_FILE_EXTENSION = "_cdc.idx";
+ private static final int LOG_FILE_EXTENSION_LENGTH =
LOG_FILE_EXTENSION.length();
+ private static final int IDX_FILE_EXTENSION_LENGTH =
IDX_FILE_EXTENSION.length();
+ private static final String LOG_FILE_COMPLETE_INDICATOR = "COMPLETED";
+ private static final String FILENAME_EXTENSION = "(" + IDX_FILE_EXTENSION
+ "|" + LOG_FILE_EXTENSION + ")";
+ private static final Pattern SEGMENT_PATTERN =
Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" +
FILENAME_EXTENSION);
+
+ private static final int READ_INDEX_FILE_MAX_RETRY = 5;
+
+ private CdcUtil()
+ {
+ throw new UnsupportedOperationException("Do not instantiate.");
+ }
+
+ public static String getIdxFilePrefix(String idxFileName)
+ {
+ return idxFileName.substring(0, idxFileName.length() -
IDX_FILE_EXTENSION_LENGTH);
+ }
+
+ public static String getLogFilePrefix(String logFileName)
+ {
+ return logFileName.substring(0, logFileName.length() -
LOG_FILE_EXTENSION_LENGTH);
+ }
+
+ public static String getIdxFileName(String logFileName)
+ {
+ return logFileName.replace(LOG_FILE_EXTENSION, IDX_FILE_EXTENSION);
+ }
+
+ public static File getIdxFile(File logFile)
+ {
+ return new File(logFile.getParent(),
getIdxFileName(logFile.getName()));
+ }
+
+ public static String getLogFileName(String indexFileName)
+ {
+ return indexFileName.replace(IDX_FILE_EXTENSION, LOG_FILE_EXTENSION);
+ }
+
+ public static CdcIndex parseIndexFile(File indexFile, long
segmentFileLength) throws IOException
+ {
+ List<String> lines = null;
+ // For an index file, if it exists, it should have non-empty content.
+ // Therefore, the lines read from the file should not be empty.
+ // If it is empty, retry reading the file. The cause is the contention
between the reader and the index file writer.
+ // In most case, the loop should only run once.
+ for (int i = 0; i < READ_INDEX_FILE_MAX_RETRY; i++)
+ {
+ try
+ {
+ lines = Files.readAllLines(indexFile.toPath());
+ if (!lines.isEmpty())
+ break;
+ }
+ catch (IOException e)
+ {
+ throw new IOException("Unable to parse the CDC segment index
file " + indexFile.getName(), e);
+ }
+ }
+ if (lines.isEmpty())
+ {
+ throw new IOException("Unable to read anything from the CDC
segment index file " + indexFile.getName());
+ }
+
+ final String lastLine = lines.get(lines.size() - 1);
+ final boolean isCompleted =
lastLine.equals(LOG_FILE_COMPLETE_INDICATOR);
+ final long latestPosition = isCompleted ? segmentFileLength :
Long.parseLong(lastLine);
+ return new CdcIndex(latestPosition, isCompleted);
+ }
+
+ /**
+ * Class representing Cdc index
+ */
+ public static class CdcIndex
+ {
+ public final long latestFlushPosition;
+ public final boolean isCompleted;
+
+ public CdcIndex(long latestFlushPosition, boolean isCompleted)
+ {
+ this.latestFlushPosition = latestFlushPosition;
+ this.isCompleted = isCompleted;
+ }
+ }
+
+ /**
+ * Validate for the cdc (log or index) file name.see {@link
SEGMENT_PATTERN} for the format
+ * @param fileName name of the file
+ * @return true if the name is valid; otherwise, false
+ */
+ public static boolean isValid(String fileName)
+ {
+ return SEGMENT_PATTERN.matcher(fileName).matches();
+ }
+
+ public static boolean isLogFile(String fileName)
+ {
+ return isValid(fileName) && fileName.endsWith(LOG_FILE_EXTENSION);
+ }
+
+ public static boolean isIndexFile(String fileName)
+ {
+ return isValid(fileName) && matchIndexExtension(fileName);
+ }
+
+ public static boolean matchIndexExtension(String fileName)
+ {
+ return fileName.endsWith(IDX_FILE_EXTENSION);
+ }
+}
diff --git
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
index 1825863d..10425e17 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.sidecar.models.HttpResponse;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static
io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
+import static
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
import static
org.apache.cassandra.sidecar.utils.MetricUtils.parseSSTableComponent;
/**
@@ -219,7 +220,7 @@ public class FileStreamer
* @param fileLength The length of the file
* @return a succeeded future when the parsing is successful, a failed
future when the range parsing fails
*/
- private Future<HttpRange> parseRangeHeader(String rangeHeader, long
fileLength)
+ public Future<HttpRange> parseRangeHeader(String rangeHeader, long
fileLength)
{
HttpRange fr = HttpRange.of(0, fileLength - 1);
if (rangeHeader == null)
@@ -236,7 +237,7 @@ public class FileStreamer
catch (IllegalArgumentException | RangeException |
UnsupportedOperationException e)
{
LOGGER.error("Failed to parse header '{}'", rangeHeader, e);
- return Future.failedFuture(new
HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code()));
+ return
Future.failedFuture(wrapHttpException(REQUESTED_RANGE_NOT_SATISFIABLE,
e.getMessage(), e));
}
}
}
diff --git a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 4b19a3c5..48e76c3b 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -42,6 +42,7 @@ import
org.apache.cassandra.sidecar.common.response.NodeSettings;
import org.apache.cassandra.sidecar.common.server.StorageOperations;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
import org.apache.cassandra.sidecar.config.HealthCheckConfiguration;
import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
import org.apache.cassandra.sidecar.config.SSTableUploadConfiguration;
@@ -51,6 +52,7 @@ import
org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.config.SslConfiguration;
import org.apache.cassandra.sidecar.config.ThrottleConfiguration;
import org.apache.cassandra.sidecar.config.yaml.AccessControlConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.HealthCheckConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl;
import org.apache.cassandra.sidecar.config.yaml.SSTableUploadConfigurationImpl;
@@ -114,6 +116,7 @@ public class TestModule extends AbstractModule
{
ThrottleConfiguration throttleConfiguration = new
ThrottleConfigurationImpl(5, 5);
SSTableUploadConfiguration uploadConfiguration = new
SSTableUploadConfigurationImpl(0F);
+ CdcConfiguration cdcConfiguration = new CdcConfigurationImpl(1L);
SchemaKeyspaceConfiguration schemaKeyspaceConfiguration =
SchemaKeyspaceConfigurationImpl.builder()
.isEnabled(true)
@@ -126,6 +129,7 @@ public class TestModule extends AbstractModule
.throttleConfiguration(throttleConfiguration)
.schemaKeyspaceConfiguration(schemaKeyspaceConfiguration)
.sstableUploadConfiguration(uploadConfiguration)
+ .cdcConfiguration(cdcConfiguration)
.build();
RestoreJobConfiguration restoreJobConfiguration =
RestoreJobConfigurationImpl.builder()
@@ -167,16 +171,19 @@ public class TestModule extends AbstractModule
1,
"src/test/resources/instance1/data",
"src/test/resources/instance1/sstable-staging",
+
"src/test/resources/instance1/cdc_raw",
true);
InstanceMetadata instance2 = mockInstance("localhost2",
2,
"src/test/resources/instance2/data",
"src/test/resources/instance2/sstable-staging",
+
"src/test/resources/instance2/cdc_raw",
false);
InstanceMetadata instance3 = mockInstance("localhost3",
3,
"src/test/resources/instance3/data",
"src/test/resources/instance3/sstable-staging",
+
"src/test/resources/instance3/cdc_raw",
true);
final List<InstanceMetadata> instanceMetas = new ArrayList<>();
instanceMetas.add(instance1);
@@ -185,13 +192,14 @@ public class TestModule extends AbstractModule
return instanceMetas;
}
- private InstanceMetadata mockInstance(String host, int id, String dataDir,
String stagingDir, boolean isUp)
+ private InstanceMetadata mockInstance(String host, int id, String dataDir,
String stagingDir, String cdcDir, boolean isUp)
{
InstanceMetadata instanceMeta = mock(InstanceMetadata.class);
when(instanceMeta.id()).thenReturn(id);
when(instanceMeta.host()).thenReturn(host);
when(instanceMeta.port()).thenReturn(6475);
when(instanceMeta.stagingDir()).thenReturn(stagingDir);
+ when(instanceMeta.cdcDir()).thenReturn(cdcDir);
List<String> dataDirectories = Collections.singletonList(dataDir);
when(instanceMeta.dataDirs()).thenReturn(dataDirectories);
when(instanceMeta.metrics()).thenReturn(new
InstanceMetricsImpl(registry(id)));
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java
new file mode 100644
index 00000000..05875b68
--- /dev/null
+++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import com.google.common.base.Preconditions;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.ExecutorPoolsHelper;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.server.MainModule;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CdcLogCacheTest
+{
+ private final Injector injector =
Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
+ private final InstancesConfig instancesConfig =
injector.getInstance(InstancesConfig.class);
+ private final CdcLogCache logCache = cdcLogCache();
+
+ @BeforeEach
+ void beforeEach()
+ {
+ logCache.initMaybe();
+ logCache.hardlinkCache.invalidateAll();
+ assertThat(logCache.hardlinkCache.size()).isZero();
+ }
+
+ @Test
+ void testLinkedFileExpiryInCache() throws IOException
+ {
+ File commitLogFile = instance1CommitLogFile();
+ File linkedCommitLog = logCache.createLinkedFileInCache(commitLogFile);
+ assertThat(linkedCommitLog).isNotNull();
+ assertThat(logCache.hardlinkCache.size()).isOne();
+
+ // wait for file to expire
+ loopAssert(2, () ->
assertThat(logCache.hardlinkCache.size()).isZero());
+ }
+
+ @Test
+ void testCleanUpLinkedFiles() throws IOException
+ {
+ File commitLogFile = instance1CommitLogFile();
+ File linkedFile = logCache.createLinkedFileInCache(commitLogFile);
+ assertThat(linkedFile.exists()).isTrue();
+
+ // Verify that cleanup deletes the linked file
+ logCache.cleanupLinkedFilesOnStartup(instancesConfig);
+ assertThat(linkedFile.exists()).isFalse();
+ }
+
+ @Test
+ void testCreateLinkedFileInCache() throws IOException
+ {
+ File commitLogFile = instance1CommitLogFile();
+ File linkedCommitLog = logCache.createLinkedFileInCache(commitLogFile);
+
+ // Check if hard link is created
+ assertThat(commitLogFile).isNotEqualTo(linkedCommitLog);
+ assertThat(Files.isSameFile(commitLogFile.toPath(),
linkedCommitLog.toPath())).isTrue();
+
+ // Should return cached linked file in subsequent calls
+
assertThat(linkedCommitLog).isEqualTo(logCache.createLinkedFileInCache(commitLogFile));
+ }
+
+ /**
+ * Failing to clean up shouldn't fail to initialize the class
+ */
+ @Test
+ void testCleanupErrorDoesntPreventInitialization()
+ {
+ assertThatNoException().isThrownBy(() -> {
+ new
FailingCdcLogCache(ExecutorPoolsHelper.createdSharedTestPool(Vertx.vertx()),
instancesConfig, sidecarConfiguration());
+ });
+ }
+
+ private File instance1CommitLogFile()
+ {
+ String commitLogPathOnInstance1 =
instancesConfig.instances().get(0).cdcDir() + "/CommitLog-1-1.log";
+ return new File(commitLogPathOnInstance1);
+ }
+
+ private CdcLogCache cdcLogCache()
+ {
+ ExecutorPools executorPools =
ExecutorPoolsHelper.createdSharedTestPool(Vertx.vertx());
+ return new CdcLogCache(executorPools, instancesConfig, 100L);
+ }
+
+ private SidecarConfiguration sidecarConfiguration()
+ {
+ SidecarConfiguration sidecarConfiguration =
mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS);
+
when(sidecarConfiguration.serviceConfiguration().cdcConfiguration().segmentHardlinkCacheExpiryInSecs()).thenReturn(1L);
+ return sidecarConfiguration;
+ }
+
+ static class FailingCdcLogCache extends CdcLogCache
+ {
+ public FailingCdcLogCache(ExecutorPools executorPools, InstancesConfig
cassandraConfig, SidecarConfiguration sidecarConfig)
+ {
+ super(executorPools, cassandraConfig, sidecarConfig);
+ }
+
+ @Override
+ public void cleanupLinkedFilesOnStartup(InstancesConfig config)
+ {
+ // Fake an error to simulate the initialization issue
+ Preconditions.checkState(false, "cdc_raw_tmp should be a
directory");
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
index a9fd5505..362219a9 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
@@ -304,6 +304,15 @@ class SidecarConfigurationTest
assertThat(vertxFsOptions.classpathResolvingEnabled()).isTrue();
}
+ @Test
+ void testCdcCofiguration() throws IOException
+ {
+ Path yamlPath = yaml("config/sidecar_cdc.yaml");
+ SidecarConfigurationImpl sidecarConfiguration =
SidecarConfigurationImpl.readYamlConfiguration(yamlPath);
+ assertThat(sidecarConfiguration).isNotNull();
+
assertThat(sidecarConfiguration.serviceConfiguration().cdcConfiguration().segmentHardlinkCacheExpiryInSecs()).isEqualTo(60L);
+ }
+
@Test
void testAccessControlConfiguration() throws Exception
{
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandlerTest.java
new file mode 100644
index 00000000..ac206275
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test class for testing ListCdcDirHandler class.
+ */
+@ExtendWith(VertxExtension.class)
+class ListCdcDirHandlerTest
+{
+ static final Logger LOGGER =
LoggerFactory.getLogger(ListCdcDirHandlerTest.class);
+ private static final String ROUTE = "/api/v1/cdc/segments";
+ private Vertx vertx;
+ private Server server;
+
+ @BeforeEach
+ public void setUp() throws InterruptedException, IOException
+ {
+ Module testOverride = new TestModule();
+ Injector injector = Guice.createInjector(Modules.override(new
MainModule())
+ .with(testOverride));
+ server = injector.getInstance(Server.class);
+ vertx = injector.getInstance(Vertx.class);
+
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testRouteSucceeds(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ client.get(server.actualPort(), "localhost", ROUTE)
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+ ListCdcSegmentsResponse listCDCSegmentsResponse =
resp.bodyAsJson(ListCdcSegmentsResponse.class);
+
assertThat(listCDCSegmentsResponse.segmentInfos().size()).isEqualTo(2);
+ for (CdcSegmentInfo segmentInfo :
listCDCSegmentsResponse.segmentInfos())
+ {
+ if (segmentInfo.name.equals("CommitLog-1-1.log"))
+ {
+
assertThat(segmentInfo.name).isEqualTo("CommitLog-1-1.log");
+ assertThat(segmentInfo.idx).isEqualTo(1);
+ assertThat(segmentInfo.size).isEqualTo(1);
+ assertThat(segmentInfo.completed).isTrue();
+ }
+ }
+ });
+ client.close();
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testLogFilesWithoutIdxFilesNotStreamed(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ client.get(server.actualPort(), "localhost", ROUTE)
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+ ListCdcSegmentsResponse listCDCSegmentsResponse =
resp.bodyAsJson(ListCdcSegmentsResponse.class);
+ for (CdcSegmentInfo segmentInfo :
listCDCSegmentsResponse.segmentInfos())
+ {
+
assertThat(segmentInfo.name).isNotEqualTo("CommitLog-1-3.log");
+ }
+ });
+ client.close();
+ context.completeNow();
+ }));
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandlerTest.java
new file mode 100644
index 00000000..5f3a1760
--- /dev/null
+++
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandlerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cdc.CdcLogCache;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test class for StreamCDCSegmentHandler.
+ */
+@ExtendWith(VertxExtension.class)
+class StreamCdcSegmentHandlerTest
+{
+ public static final String CDC_RAW_DIR =
"./src/test/resources/instance1/cdc_raw";
+ public static final String CDC_RAW_TEMP_DIR = CDC_RAW_DIR +
CdcLogCache.TEMP_DIR_SUFFIX;
+ static final Logger LOGGER =
LoggerFactory.getLogger(StreamCdcSegmentHandlerTest.class);
+ private Vertx vertx;
+ private Server server;
+
+ @BeforeEach
+ void setUp() throws InterruptedException, IOException
+ {
+ Module testOverride = new TestModule();
+ Injector injector = Guice.createInjector(Modules.override(new
MainModule())
+ .with(testOverride));
+ server = injector.getInstance(Server.class);
+ vertx = injector.getInstance(Vertx.class);
+
+ VertxTestContext context = new VertxTestContext();
+ server.start()
+ .onSuccess(s -> context.completeNow())
+ .onFailure(context::failNow);
+ context.awaitCompletion(5, TimeUnit.SECONDS);
+ }
+
+ @AfterEach
+ void after() throws InterruptedException
+ {
+ CountDownLatch closeLatch = new CountDownLatch(1);
+ server.close().onSuccess(res -> closeLatch.countDown());
+ if (closeLatch.await(60, TimeUnit.SECONDS))
+ LOGGER.info("Close event received before timeout.");
+ else
+ LOGGER.error("Close event timed out.");
+ }
+
+ @Test
+ void testRouteSucceeds(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-1.log";
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(resp -> {
+ context.verify(() ->
assertThat(resp.bodyAsString()).isEqualTo("x"));
+ client.close();
+ context.completeNow();
+ }));
+ spinAssertCdcRawTempEmpty();
+ }
+
+ @Test // A variant of testRouteSucceeds. It sends concurrent requests for
the same segment and assert the all requests should be satisfied.
+ void testConcurrentRequestsForSegmentSucceeds(VertxTestContext context)
+ {
+ int requests = 5;
+ WebClient client = WebClient.create(vertx);
+ CountDownLatch latch = new CountDownLatch(requests);
+ String testRoute = "/cdc/segments/CommitLog-1-1.log";
+ for (int i = 0; i < requests; i++)
+ {
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_OK)
+ .send(context.succeeding(resp -> {
+ context.verify(() ->
assertThat(resp.bodyAsString()).isEqualTo("x"));
+ latch.countDown();
+ if (latch.getCount() == 0)
+ {
+ context.completeNow();
+ }
+ }));
+ }
+ spinAssertCdcRawTempEmpty();
+ }
+
+ @Test // The server internal should re-create hardlinks for each request
in this scenario.
+ void testSequentialRequestsForSegmentSucceeds(VertxTestContext context)
+ {
+ for (int i = 0; i < 3; i++)
+ {
+ testRouteSucceeds(context);
+ }
+ }
+
+ @Test
+ void testSegmentNotFoundSucceeds(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-123456.log";
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .expect(ResponsePredicate.SC_NOT_FOUND)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+ assertThat(resp.statusCode()).isEqualTo(404);
+ assertThat(resp.statusMessage()).isEqualTo("Not Found");
+ assertThat(resp.bodyAsJsonObject().getString("message"))
+ .isEqualTo("CDC segment not found:
CommitLog-1-123456.log");
+ });
+ client.close();
+ context.completeNow();
+ }));
+ }
+
+ @Test
+ void testIncorrectSegmentExtensionSucceeds(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-1"; // not a valid file
name, missing `.log` extension
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .expect(ResponsePredicate.SC_BAD_REQUEST)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+ assertThat(resp.statusCode()).isEqualTo(400);
+ assertThat(resp.statusMessage()).isEqualTo("Bad
Request");
+ assertThat(resp.bodyAsJsonObject().getString("message"))
+ .isEqualTo("Invalid path param for CDC segment:
CommitLog-1-1");
+ });
+ context.completeNow();
+ client.close();
+ }));
+ }
+
+ @Test
+ void testInvalidRangeFails(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-1.log";
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .putHeader("Range", "bytes=4-3")
+ .expect(ResponsePredicate.SC_REQUESTED_RANGE_NOT_SATISFIABLE)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+ assertThat(resp.statusCode()).isEqualTo(416);
+ assertThat(resp.statusMessage()).isEqualTo("Requested
Range Not Satisfiable");
+ assertThat(resp.bodyAsJsonObject().getString("message"))
+ .isEqualTo("Range does not satisfy boundary
requirements. range=[4, 3]");
+ });
+ client.close();
+ context.completeNow();
+ }));
+ spinAssertCdcRawTempEmpty();
+ }
+
+ @Test
+ void testPartialRangeStreamedSucceeds(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-2.log";
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .putHeader("Range", "bytes=0-2")
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_PARTIAL_CONTENT)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+
assertThat(resp.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString()))
+ .withFailMessage("It should only stream to the last
flushed position: 1")
+ .isEqualTo("1");
+ // see src/test/resources/cdc_raw/CommitLog-1-2_cdc.idx
+
assertThat(resp.getHeader(HttpHeaderNames.CONTENT_RANGE.toString()))
+ .withFailMessage("It should only stream to the last
flushed position: 1")
+ .isEqualTo("bytes 0-0/1");
+ assertThat(resp.bodyAsString()).isEqualTo("x");
+ });
+ client.close();
+ context.completeNow();
+ }));
+ spinAssertCdcRawTempEmpty();
+ }
+
+ @Test
+ void
testPartialRangeStreamedForIncompleteLogFileWithoutRangeHeader(VertxTestContext
context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-2.log";
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_PARTIAL_CONTENT)
+ .send(context.succeeding(resp -> {
+ context.verify(() ->
assertThat(resp.bodyAsString()).isEqualTo("x"));
+ client.close();
+ context.completeNow();
+ }));
+ spinAssertCdcRawTempEmpty();
+ }
+
+ @Test
+ void testIdxFileIsNotStreamed(VertxTestContext context)
+ {
+ WebClient client = WebClient.create(vertx);
+ String testRoute = "/cdc/segments/CommitLog-1-2_cdc.idx";
+ client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+ .as(BodyCodec.buffer())
+ .expect(ResponsePredicate.SC_BAD_REQUEST)
+ .send(context.succeeding(resp -> {
+ context.verify(() -> {
+ assertThat(resp.statusCode()).isEqualTo(400);
+ assertThat(resp.statusMessage()).isEqualTo("Bad
Request");
+ assertThat(resp.bodyAsJsonObject().getString("message"))
+ .isEqualTo("Invalid path param for CDC segment:
CommitLog-1-2_cdc.idx");
+ });
+ client.close();
+ context.completeNow();
+ }));
+ }
+
+ private void spinAssertCdcRawTempEmpty()
+ {
+ File cdcTempDir = new File(CDC_RAW_TEMP_DIR);
+ assertThat(cdcTempDir.exists()).isTrue();
+ int attempts = 10;
+ while (attempts > 0)
+ {
+ File[] files = cdcTempDir.listFiles();
+ assertThat(files).isNotNull();
+ if (files.length > 0)
+ {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ attempts--;
+ if (attempts == 0)
+ {
+ assertThat(files.length)
+ .withFailMessage("Expect empty directory. But found those
files: " + Arrays.toString(files))
+ .isEqualTo(0);
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+}
diff --git
a/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
b/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
index 0ade5dc9..a91b7cb2 100644
---
a/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
+++
b/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
@@ -29,7 +29,6 @@ import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.buffer.Buffer;
diff --git a/server/src/test/resources/config/sidecar_cdc.yaml
b/server/src/test/resources/config/sidecar_cdc.yaml
new file mode 100644
index 00000000..5116049b
--- /dev/null
+++ b/server/src/test/resources/config/sidecar_cdc.yaml
@@ -0,0 +1,73 @@
+#
+# Cassandra SideCar configuration file
+#
+cassandra:
+ host: localhost
+ port: 9042
+ username: cassandra
+ password: cassandra
+ data_dirs:
+ - /ccm/test/node1/data0
+ - /ccm/test/node1/data1
+ staging_dir: /ccm/test/node1/sstable-staging
+ jmx_host: 127.0.0.1
+ jmx_port: 7199
+ jmx_role: controlRole
+ jmx_role_password: controlPassword
+ jmx_ssl_enabled: true
+
+sidecar:
+ host: 0.0.0.0
+ port: 0 # bind sever to the first available port
+ request_idle_timeout_millis: 300000 # this field expects integer value
+ request_timeout_millis: 300000
+ tcp_keep_alive: false
+ accept_backlog: 1024
+ server_verticle_instances: 2
+ throttle:
+ stream_requests_per_sec: 5000
+ timeout_sec: 10
+ traffic_shaping:
+ inbound_global_bandwidth_bps: 500
+ outbound_global_bandwidth_bps: 1500
+ peak_outbound_global_bandwidth_bps: 2000
+ max_delay_to_wait_millis: 2500
+ check_interval_for_stats_millis: 3000
+ sstable_upload:
+ concurrent_upload_limit: 80
+ min_free_space_percent: 10
+ allowable_time_skew_in_minutes: 60
+ sstable_import:
+ poll_interval_millis: 100
+ cache:
+ expire_after_access_millis: 7200000 # 2 hours
+ maximum_size: 10000
+ sstable_snapshot:
+ snapshot_list_cache:
+ expire_after_access_millis: 350
+ maximum_size: 450
+ worker_pools:
+ service:
+ name: "sidecar-worker-pool"
+ size: 20
+ max_execution_time_millis: 60000 # 60 seconds
+ internal:
+ name: "sidecar-internal-worker-pool"
+ size: 20
+ max_execution_time_millis: 900000 # 15 minutes
+ cdc:
+ segment_hardlink_cache_expiry_in_secs: 60 # 1 mins
+ jmx:
+ max_retries: 42
+ retry_delay_millis: 1234
+ schema:
+ is_enabled: false
+ keyspace: sidecar_internal
+ replication_strategy: SimpleStrategy
+ replication_factor: 1
+
+vertx:
+ filesystem_options:
+ classpath_resolving_enabled: true
+ file_cache_dir: /path/to/vertx/cache
+ file_caching_enabled: true
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1.log
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1.log
new file mode 100644
index 00000000..c1b0730e
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1.log
@@ -0,0 +1 @@
+x
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1_cdc.idx
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1_cdc.idx
new file mode 100644
index 00000000..70edf50b
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1_cdc.idx
@@ -0,0 +1,2 @@
+1
+COMPLETED
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2.log
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2.log
new file mode 100644
index 00000000..5ee608e7
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2.log
@@ -0,0 +1 @@
+xxxx
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2_cdc.idx
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2_cdc.idx
new file mode 100644
index 00000000..56a6051c
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2_cdc.idx
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-3.log
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-3.log
new file mode 100644
index 00000000..ac8522fb
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-3.log
@@ -0,0 +1 @@
+xxx
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]