This is an automated email from the ASF dual-hosted git repository.

ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8d88fba7 CASSSIDECAR-158: Adding APIs required for CDC (#147)
8d88fba7 is described below

commit 8d88fba753db9a464bb0b562b87bd0d5b6271c69
Author: Jyothsna konisa <[email protected]>
AuthorDate: Mon Dec 2 15:28:18 2024 -0800

    CASSSIDECAR-158: Adding APIs required for CDC (#147)
    
    Patch by Jyothsna Konisa, Saranya Krishnakumar, Yifan Cai; Reviewed by 
Bernardo Botella, Francisco Guerrero, James Berragan, Yifan Cai for 
CASSSIDECAR-158
    
    Co-authored-by: Jyothsna Konisa <[email protected]>
    Co-authored-by: Saranya Krishnakumar <[email protected]>
    Co-authored-by: Yifan Cai <[email protected]>
---
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   7 +
 .../common/response/ListCdcSegmentsResponse.java   |  68 +++++
 .../common/response/data/CdcSegmentInfo.java       |  48 ++++
 .../cassandra/sidecar/common/utils/HttpRange.java  |   6 +-
 .../sidecar/common/utils/Preconditions.java        |  31 +++
 .../response/ListCdcSegmentsResponseTest.java      |  56 ++++
 .../sidecar/common/utils/HttpRangeTest.java        |   4 +-
 server/src/main/dist/conf/sidecar.yaml             |   5 +
 .../apache/cassandra/sidecar/cdc/CdcLogCache.java  | 205 +++++++++++++++
 .../sidecar/cluster/instance/InstanceMetadata.java |   5 +
 .../cluster/instance/InstanceMetadataImpl.java     |  21 ++
 .../cassandra/sidecar/config/CdcConfiguration.java |  29 +++
 .../sidecar/config/InstanceConfiguration.java      |   5 +
 .../sidecar/config/ServiceConfiguration.java       |   5 +
 .../sidecar/config/SidecarConfiguration.java       |   1 +
 .../sidecar/config/yaml/CdcConfigurationImpl.java  |  50 ++++
 .../config/yaml/InstanceConfigurationImpl.java     |  16 ++
 .../config/yaml/ServiceConfigurationImpl.java      |  31 +++
 .../sidecar/routes/cdc/ListCdcDirHandler.java      | 149 +++++++++++
 .../routes/cdc/StreamCdcSegmentHandler.java        | 196 ++++++++++++++
 .../cassandra/sidecar/server/MainModule.java       |  11 +
 .../apache/cassandra/sidecar/utils/CdcUtil.java    | 144 +++++++++++
 .../cassandra/sidecar/utils/FileStreamer.java      |   5 +-
 .../org/apache/cassandra/sidecar/TestModule.java   |  10 +-
 .../cassandra/sidecar/cdc/CDCLogCacheTest.java     | 143 +++++++++++
 .../sidecar/config/SidecarConfigurationTest.java   |   9 +
 .../sidecar/routes/cdc/ListCdcDirHandlerTest.java  | 132 ++++++++++
 .../routes/cdc/StreamCdcSegmentHandlerTest.java    | 286 +++++++++++++++++++++
 .../restore/RestoreJobProgressHandlerTest.java     |   1 -
 server/src/test/resources/config/sidecar_cdc.yaml  |  73 ++++++
 .../resources/instance1/cdc_raw/CommitLog-1-1.log  |   1 +
 .../instance1/cdc_raw/CommitLog-1-1_cdc.idx        |   2 +
 .../resources/instance1/cdc_raw/CommitLog-1-2.log  |   1 +
 .../instance1/cdc_raw/CommitLog-1-2_cdc.idx        |   1 +
 .../resources/instance1/cdc_raw/CommitLog-1-3.log  |   1 +
 35 files changed, 1750 insertions(+), 8 deletions(-)

diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index ec4c7b7e..be49b511 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -109,6 +109,13 @@ public final class ApiEndpointsV1
     public static final String ABORT_RESTORE_JOB_ROUTE = RESTORE_JOB_ROUTE + 
ABORT;
     public static final String RESTORE_JOB_PROGRESS_ROUTE = RESTORE_JOB_ROUTE 
+ PROGRESS;
 
+    // CDC APIs
+    public static final String CDC_PATH = "/cdc";
+    public static final String SEGMENT_PATH_PARAM = ":segment";
+    public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + 
"/segments";
+    public static final String STREAM_CDC_SEGMENTS_ROUTE = 
LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM;
+
+
     public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + 
CASSANDRA + "/stats/connected-clients";
 
     private ApiEndpointsV1()
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
new file mode 100644
index 00000000..8b72cb2b
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponse.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+
+/**
+ * A class representing a response for a list CDC Segment request
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ListCdcSegmentsResponse
+{
+    private final String host;
+    private final int port;
+    private final List<CdcSegmentInfo> segmentInfos;
+
+    @JsonCreator
+    public ListCdcSegmentsResponse(@JsonProperty("host") String host,
+                                   @JsonProperty("port") int port,
+                                   @JsonProperty("segmentInfos") 
List<CdcSegmentInfo> segmentsInfo)
+    {
+        this.host = host;
+        this.port = port;
+        this.segmentInfos = Collections.unmodifiableList(segmentsInfo);
+    }
+
+    @JsonProperty("host")
+    public String host()
+    {
+        return host;
+    }
+
+    @JsonProperty("port")
+    public int port()
+    {
+        return port;
+    }
+
+    @JsonProperty("segmentInfos")
+    public List<CdcSegmentInfo> segmentInfos()
+    {
+        return segmentInfos;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
new file mode 100644
index 00000000..1745fa9d
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/CdcSegmentInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Class representing segment information.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class CdcSegmentInfo
+{
+    public final String name;
+    public final long size;
+    public final long idx;
+    public final boolean completed;
+    public final long lastModifiedTimestamp;
+
+    public CdcSegmentInfo(@JsonProperty("name") String name, 
@JsonProperty("size") long size,
+                          @JsonProperty("idx") long idx, 
@JsonProperty("completed") boolean completed,
+                          @JsonProperty("lastModifiedTimestamp") long 
lastModifiedTimestamp)
+    {
+        this.name = name;
+        this.size = size;
+        this.idx = idx;
+        this.completed = completed;
+        this.lastModifiedTimestamp = lastModifiedTimestamp;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
index a4039da4..9f155aa8 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/HttpRange.java
@@ -44,8 +44,10 @@ public class HttpRange
     // An initialized range is always valid; invalid params fail range 
initialization.
     private HttpRange(final long start, final long end)
     {
-        Preconditions.checkArgument(start >= 0, "Range start can not be 
negative");
-        Preconditions.checkArgument(end >= start, "Range does not satisfy 
boundary requirements");
+        Preconditions.checkArgument(start >= 0,
+                                    () -> String.format("Range start can not 
be negative. range=[%s, %s]", start, end));
+        Preconditions.checkArgument(end >= start,
+                                    () -> String.format("Range does not 
satisfy boundary requirements. range=[%s, %s]", start, end));
         this.start = start;
         this.end = end;
         long len = end - start + 1; // Assign long max if overflows
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
index f6445b01..ca845d30 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/utils/Preconditions.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.common.utils;
 
+import java.util.function.Supplier;
+
 /**
  * A class to prevent introducing a dependency to guava in common and client
  */
@@ -39,6 +41,21 @@ public class Preconditions
         }
     }
 
+    /**
+     * Similar to {@link #checkArgument(boolean, String)}, but allows to 
evaluate the error message lazily
+     *
+     * @param validCondition the condition to evaluate
+     * @param errorMessageSupplier supplies the error message to use for the 
{@link IllegalArgumentException}
+     * @throws IllegalArgumentException when the condition is not valid (i.e. 
{@code false})
+     */
+    public static void checkArgument(boolean validCondition, Supplier<String> 
errorMessageSupplier)
+    {
+        if (!validCondition)
+        {
+            throw new IllegalArgumentException(errorMessageSupplier.get());
+        }
+    }
+
     /**
      * Throws an {@link IllegalStateException} when the {@code validCondition} 
is {@code false}, otherwise
      * no action is taken.
@@ -53,4 +70,18 @@ public class Preconditions
             throw new IllegalStateException(errorMessage);
         }
     }
+
+    /**
+     * Similar to {@link #checkState(boolean, String)}, but allows to evaluate 
the error message lazily
+     *
+     * @param validCondition the condition to evaluate
+     * @param errorMessageSupplier supplies the error message to use for the 
{@link IllegalStateException}
+     */
+    public static void checkState(boolean validCondition, Supplier<String> 
errorMessageSupplier)
+    {
+        if (!validCondition)
+        {
+            throw new IllegalStateException(errorMessageSupplier.get());
+        }
+    }
 }
diff --git 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponseTest.java
 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponseTest.java
new file mode 100644
index 00000000..d057c55d
--- /dev/null
+++ 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/response/ListCdcSegmentsResponseTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test {@link ListCdcSegmentsResponse}
+ */
+class ListCdcSegmentsResponseTest
+{
+    @Test
+    void testSerDeser() throws Exception
+    {
+        List<CdcSegmentInfo> segments = Arrays.asList(new 
CdcSegmentInfo("commit-log1", 100, 100, true, 1732148713725L),
+                                                      new 
CdcSegmentInfo("commit-log2", 100, 10, false, 1732148713725L));
+        ListCdcSegmentsResponse response = new 
ListCdcSegmentsResponse("localhost", 9043, segments);
+        ObjectMapper mapper = new ObjectMapper();
+        String json = mapper.writeValueAsString(response);
+        assertThat(json).isEqualTo("{\"host\":\"localhost\"," +
+                                   "\"port\":9043," +
+                                   "\"segmentInfos\":[" +
+                                   
"{\"name\":\"commit-log1\",\"size\":100,\"idx\":100,\"completed\":true,\"lastModifiedTimestamp\":1732148713725},"
 +
+                                   
"{\"name\":\"commit-log2\",\"size\":100,\"idx\":10,\"completed\":false,\"lastModifiedTimestamp\":1732148713725}]}");
+        ListCdcSegmentsResponse deserialized = mapper.readValue(json, 
ListCdcSegmentsResponse.class);
+        assertThat(deserialized.host()).isEqualTo("localhost");
+        assertThat(deserialized.port()).isEqualTo(9043);
+        assertThat(deserialized.segmentInfos()).hasSize(2);
+        
assertThat(deserialized.segmentInfos().get(0).name).isEqualTo("commit-log1");
+        
assertThat(deserialized.segmentInfos().get(1).name).isEqualTo("commit-log2");
+    }
+}
diff --git 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
index efc96130..4ad8bdba 100644
--- 
a/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
+++ 
b/client-common/src/test/java/org/apache/cassandra/sidecar/common/utils/HttpRangeTest.java
@@ -96,7 +96,7 @@ public class HttpRangeTest
         {
             HttpRange.parseHeader(rangeHeader, Long.MAX_VALUE);
         });
-        String msg = "Range does not satisfy boundary requirements";
+        String msg = "Range does not satisfy boundary requirements. 
range=[9223372036854775807, 9223372036854775806]";
         assertEquals(msg, thrownException.getMessage());
     }
 
@@ -108,7 +108,7 @@ public class HttpRangeTest
         {
             HttpRange.parseHeader(rangeHeader, Long.MAX_VALUE);
         });
-        String msg = "Range does not satisfy boundary requirements";
+        String msg = "Range does not satisfy boundary requirements. range=[9, 
2]";
         assertEquals(msg, thrownException.getMessage());
     }
 
diff --git a/server/src/main/dist/conf/sidecar.yaml 
b/server/src/main/dist/conf/sidecar.yaml
index 8fe0d742..1588bcf2 100644
--- a/server/src/main/dist/conf/sidecar.yaml
+++ b/server/src/main/dist/conf/sidecar.yaml
@@ -26,6 +26,7 @@ cassandra_instances:
     data_dirs:
       - ~/.ccm/test/node1/data0
     staging_dir: ~/.ccm/test/node1/sstable-staging
+    cdc_dir: ~/.ccm/test/node1/cdc_raw
     jmx_host: 127.0.0.1
     jmx_port: 7100
     jmx_ssl_enabled: false
@@ -37,6 +38,7 @@ cassandra_instances:
     data_dirs:
       - ~/.ccm/test/node2/data0
     staging_dir: ~/.ccm/test/node2/sstable-staging
+    cdc_dir: ~/.ccm/test/node2/cdc_raw
     jmx_host: 127.0.0.1
     jmx_port: 7200
     jmx_ssl_enabled: false
@@ -48,6 +50,7 @@ cassandra_instances:
     data_dirs:
       - ~/.ccm/test/node3/data0
     staging_dir: ~/.ccm/test/node3/sstable-staging
+    cdc_dir: ~/.ccm/test/node3/cdc_raw
     jmx_host: 127.0.0.1
     jmx_port: 7300
     jmx_ssl_enabled: false
@@ -86,6 +89,8 @@ sidecar:
     snapshot_list_cache:
       expire_after_access_millis: 7200000 # 2 hours
       maximum_size: 10000
+  cdc:
+    segment_hardlink_cache_expiry_in_secs: 300 # 5 mins
   worker_pools:
     service:
       name: "sidecar-worker-pool"
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcLogCache.java 
b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcLogCache.java
new file mode 100644
index 00000000..eb1fb894
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcLogCache.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+
+/**
+ * CDCLogCache caches the recently downloaded files to avoid being deleted by 
accident.
+ * <br>
+ * Downloads tracking is via {@linkplain #touch}.
+ * <br>
+ * In the event of deleting the _consumed_ files, 1 supersedes 2, meaning the 
_consumed_ files and their links
+ * are deleted, even though within the cache duration.
+ */
+@Singleton
+public class CdcLogCache
+{
+    public static final String TEMP_DIR_SUFFIX = "_tmp";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcLogCache.class);
+    private static final RemovalListener<File, File> hardlinkRemover = 
notification -> deleteFileIfExist(notification.getValue());
+
+    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+    private final TaskExecutorPool internalExecutorPool;
+    private final long cacheExpiryInMillis;
+
+    // Cache for the hardlinks. Key: origin file; Value: link file
+    // The entries expire after 5 minutes
+    @VisibleForTesting
+    final Cache<File, File> hardlinkCache;
+
+    @Inject
+    public CdcLogCache(ExecutorPools executorPools,
+                       InstancesConfig instancesConfig,
+                       SidecarConfiguration sidecarConfig)
+    {
+        this(executorPools, instancesConfig,
+             TimeUnit.SECONDS.toMillis(sidecarConfig.serviceConfiguration()
+                                                    .cdcConfiguration()
+                                                    
.segmentHardlinkCacheExpiryInSecs()));
+    }
+
+    @VisibleForTesting
+    CdcLogCache(ExecutorPools executorPools,
+                InstancesConfig instancesConfig,
+                long cacheExpiryInMillis)
+    {
+        this.cacheExpiryInMillis = cacheExpiryInMillis;
+        this.internalExecutorPool = executorPools.internal();
+        this.hardlinkCache = CacheBuilder.newBuilder()
+                                         
.expireAfterAccess(cacheExpiryInMillis, TimeUnit.MILLISECONDS)
+                                         .removalListener(hardlinkRemover)
+                                         .build();
+        // Run cleanup in the internal pool to mute any exceptions. The 
cleanup is best-effort.
+        internalExecutorPool.runBlocking(() -> 
cleanupLinkedFilesOnStartup(instancesConfig));
+    }
+
+    public void initMaybe()
+    {
+        if (isInitialized.get())
+        {
+            return;
+        }
+
+        if (isInitialized.compareAndSet(false, true))
+        {
+            // setup periodic and serial cleanup
+            internalExecutorPool.setPeriodic(cacheExpiryInMillis,
+                                             id -> hardlinkCache.cleanUp(),
+                                             true);
+        }
+    }
+
+    public void touch(File segmentFile, File indexFile)
+    {
+        // renew the hardlinks
+        hardlinkCache.getIfPresent(segmentFile);
+        hardlinkCache.getIfPresent(indexFile);
+    }
+
+    /**
+     * Create a hardlink from origin in the cache if the hardlink does not 
exist yet.
+     *
+     * @param origin the source file
+     * @return the link file
+     * @throws IOException when an IO exception occurs during link
+     */
+    public File createLinkedFileInCache(File origin) throws IOException
+    {
+        File link = hardlinkCache.getIfPresent(origin);
+        if (link == null)
+        {
+            link = new File(getTempCdcDir(origin.getParent()), 
origin.getName());
+            try
+            {
+                // create link and cache it
+                Files.createLink(link.toPath(), origin.toPath());
+                hardlinkCache.put(origin, link);
+            }
+            catch (FileAlreadyExistsException e)
+            {
+                LOGGER.debug("The target of hardlink {} already exists. It 
could be created by a concurrent request.", link);
+            }
+        }
+        return link;
+    }
+
+    /**
+     * Clean up the linked file when the application is starting.
+     * There could be files left over if the application crashes during 
streaming the CDC segments.
+     * On a new start, the tmp directory for the linked CDC segments should be 
empty.
+     * It is only called in the constructor of the handler singleton.
+     *
+     * @param config instances config
+     */
+    @VisibleForTesting
+    public void cleanupLinkedFilesOnStartup(InstancesConfig config)
+    {
+        for (InstanceMetadata instance : config.instances())
+        {
+            try
+            {
+                cleanupLinkedFiles(instance);
+            }
+            catch (Exception e)
+            {
+                LOGGER.warn("Failed to clean up linked files for instance {}", 
instance.id(), e);
+            }
+        }
+    }
+
+    private void cleanupLinkedFiles(InstanceMetadata instance) throws 
IOException
+    {
+        File[] files = getTempCdcDir((instance.cdcDir()))
+                       .listFiles(f -> CdcUtil.isLogFile(f.getName()) || 
CdcUtil.isIndexFile(f.getName()));
+        if (files == null)
+            return;
+        for (File f : files)
+        {
+            deleteFileIfExist(f);
+        }
+    }
+
+    private static void deleteFileIfExist(File file)
+    {
+        if (file.exists() && file.isFile())
+        {
+            if (file.delete())
+            {
+                LOGGER.debug("Removed the link file={}", file);
+            }
+        }
+    }
+
+    private static File getTempCdcDir(String cdcDir) throws IOException
+    {
+        File dir = new File(cdcDir + TEMP_DIR_SUFFIX);
+        try
+        {
+            Files.createDirectories(dir.toPath());
+        }
+        catch (IOException e)
+        {
+            LOGGER.error("Unable to create temporary CDC directory {}", dir, 
e);
+            throw e;
+        }
+        return dir;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
index b3860e13..5d5ff5c0 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
@@ -56,6 +56,11 @@ public interface InstanceMetadata
      */
     String stagingDir();
 
+    /**
+     * @return cdc directory of the cassandra instance
+     */
+    String cdcDir();
+
     /**
      * @return a {@link CassandraAdapterDelegate} specific for the instance
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index 21cac7d9..c3cded7e 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -42,6 +42,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
     private final int port;
     private final List<String> dataDirs;
     private final String stagingDir;
+    private final String cdcDir;
     @Nullable
     private final CassandraAdapterDelegate delegate;
     private final InstanceMetrics metrics;
@@ -55,6 +56,7 @@ public class InstanceMetadataImpl implements InstanceMetadata
                                    .map(FileUtils::maybeResolveHomeDirectory)
                                    
.collect(Collectors.collectingAndThen(Collectors.toList(), 
Collections::unmodifiableList));
         stagingDir = FileUtils.maybeResolveHomeDirectory(builder.stagingDir);
+        cdcDir = builder().cdcDir;
         delegate = builder.delegate;
         metrics = builder.metrics;
     }
@@ -89,6 +91,12 @@ public class InstanceMetadataImpl implements InstanceMetadata
         return stagingDir;
     }
 
+    @Override
+    public String cdcDir()
+    {
+        return cdcDir;
+    }
+
     @Override
     public @Nullable CassandraAdapterDelegate delegate()
     {
@@ -116,6 +124,7 @@ public class InstanceMetadataImpl implements 
InstanceMetadata
         protected int port;
         protected List<String> dataDirs;
         protected String stagingDir;
+        protected String cdcDir;
         protected CassandraAdapterDelegate delegate;
         protected MetricRegistry metricRegistry;
         protected InstanceMetrics metrics;
@@ -131,6 +140,7 @@ public class InstanceMetadataImpl implements 
InstanceMetadata
             port = instanceMetadata.port;
             dataDirs = new ArrayList<>(instanceMetadata.dataDirs);
             stagingDir = instanceMetadata.stagingDir;
+            cdcDir = instanceMetadata.cdcDir;
             delegate = instanceMetadata.delegate;
             metrics = instanceMetadata.metrics;
         }
@@ -196,6 +206,17 @@ public class InstanceMetadataImpl implements 
InstanceMetadata
             return update(b -> b.stagingDir = stagingDir);
         }
 
+        /**
+         * Sets the {@code cdcDir} and returns a reference to this Builder 
enabling method chaining.
+         *
+         * @param cdcDir the {@code cdcDir} to set
+         * @return a reference to this Builder
+         */
+        public Builder cdcDir(String cdcDir)
+        {
+            return update(b -> b.cdcDir = cdcDir);
+        }
+
         /**
          * Sets the {@code delegate} and returns a reference to this Builder 
enabling method chaining.
          *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
new file mode 100644
index 00000000..517c6b40
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.config;
+
+/**
+ * This class encapsulates configuration values for cdc.
+ */
+public interface CdcConfiguration
+{
+    /**
+     * @return segment hard link cache expiration time in seconds used in 
{@link org.apache.cassandra.sidecar.cdc.CdcLogCache}
+     */
+    long segmentHardlinkCacheExpiryInSecs();
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
index 12108a55..ca4e2892 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/InstanceConfiguration.java
@@ -50,6 +50,11 @@ public interface InstanceConfiguration
      */
     String stagingDir();
 
+    /**
+     * @return cdc directory of the cassandra instance
+     */
+    String cdcDir();
+
     /**
      * @return the host address of the JMX service for the Cassandra instance
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
index b47caf3a..a5150f79 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java
@@ -144,4 +144,9 @@ public interface ServiceConfiguration
      * @return the configuration for sidecar schema
      */
     SchemaKeyspaceConfiguration schemaKeyspaceConfiguration();
+
+    /**
+     * @return the configuration for cdc
+     */
+    CdcConfiguration cdcConfiguration();
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 7babd0e7..736415f6 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -87,4 +87,5 @@ public interface SidecarConfiguration
      * @return the configuration for vert.x
      */
     @Nullable VertxConfiguration vertxConfiguration();
+
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
new file mode 100644
index 00000000..eaccd1e7
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.config.yaml;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
+
+/**
+ * Encapsulate configuration values for CDC
+ */
+public class CdcConfigurationImpl implements CdcConfiguration
+{
+    public static final String SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS_PROPERTY 
= "segment_hardlink_cache_expiry_in_secs";
+    public static final long DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS = 
300;
+
+    @JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS_PROPERTY, 
defaultValue = DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS + "")
+    protected final long segmentHardLinkCacheExpiryInSecs;
+
+    public CdcConfigurationImpl()
+    {
+        this.segmentHardLinkCacheExpiryInSecs = 
DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS;
+    }
+
+    public CdcConfigurationImpl(long segmentHardLinkCacheExpiryInSecs)
+    {
+        this.segmentHardLinkCacheExpiryInSecs = 
segmentHardLinkCacheExpiryInSecs;
+    }
+
+    @Override
+    @JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_IN_SECS_PROPERTY)
+    public long segmentHardlinkCacheExpiryInSecs()
+    {
+        return segmentHardLinkCacheExpiryInSecs;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
index 891ae0b7..6d544b92 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
@@ -44,6 +44,9 @@ public class InstanceConfigurationImpl implements 
InstanceConfiguration
     @JsonProperty("staging_dir")
     protected final String stagingDir;
 
+    @JsonProperty("cdc_dir")
+    protected final String cdcDir;
+
     @JsonProperty("jmx_host")
     protected final String jmxHost;
 
@@ -66,6 +69,7 @@ public class InstanceConfigurationImpl implements 
InstanceConfiguration
         this.port = 9042;
         this.dataDirs = null;
         this.stagingDir = null;
+        this.cdcDir = null;
         this.jmxHost = null;
         this.jmxPort = 0;
         this.jmxSslEnabled = false;
@@ -78,6 +82,7 @@ public class InstanceConfigurationImpl implements 
InstanceConfiguration
                                      int port,
                                      List<String> dataDirs,
                                      String stagingDir,
+                                     String cdcDir,
                                      String jmxHost,
                                      int jmxPort,
                                      boolean jmxSslEnabled,
@@ -89,6 +94,7 @@ public class InstanceConfigurationImpl implements 
InstanceConfiguration
         this.port = port;
         this.dataDirs = Collections.unmodifiableList(dataDirs);
         this.stagingDir = stagingDir;
+        this.cdcDir = cdcDir;
         this.jmxHost = jmxHost;
         this.jmxPort = jmxPort;
         this.jmxSslEnabled = jmxSslEnabled;
@@ -146,6 +152,16 @@ public class InstanceConfigurationImpl implements 
InstanceConfiguration
         return stagingDir;
     }
 
+    /**
+     * @return cdc directory of the cassandra instance
+     */
+    @Override
+    @JsonProperty("cdc_dir")
+    public String cdcDir()
+    {
+        return cdcDir;
+    }
+
     /**
      * @return the host address of the JMX service for the Cassandra instance
      */
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
index 36611a34..8f2c9879 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/ServiceConfigurationImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.cassandra.sidecar.common.DataObjectBuilder;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
 import org.apache.cassandra.sidecar.config.JmxConfiguration;
 import org.apache.cassandra.sidecar.config.SSTableImportConfiguration;
 import org.apache.cassandra.sidecar.config.SSTableSnapshotConfiguration;
@@ -34,6 +35,7 @@ import 
org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.ThrottleConfiguration;
 import org.apache.cassandra.sidecar.config.TrafficShapingConfiguration;
 import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Configuration for the Sidecar Service and configuration of the REST 
endpoints in the service
@@ -64,6 +66,7 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
     private static final String JMX_PROPERTY = "jmx";
     private static final String TRAFFIC_SHAPING_PROPERTY = "traffic_shaping";
     private static final String SCHEMA = "schema";
+    private static final String CDC = "cdc";
     protected static final Map<String, WorkerPoolConfiguration> 
DEFAULT_WORKER_POOLS_CONFIGURATION
     = Collections.unmodifiableMap(new HashMap<String, 
WorkerPoolConfiguration>()
     {{
@@ -123,6 +126,9 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
     @JsonProperty(value = SCHEMA)
     protected final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration;
 
+    @JsonProperty(value = CDC)
+    protected final CdcConfiguration cdcConfiguration;
+
     /**
      * Constructs a new {@link ServiceConfigurationImpl} with the default 
values
      */
@@ -154,6 +160,7 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
         jmxConfiguration = builder.jmxConfiguration;
         trafficShapingConfiguration = builder.trafficShapingConfiguration;
         schemaKeyspaceConfiguration = builder.schemaKeyspaceConfiguration;
+        cdcConfiguration = builder.cdcConfiguration;
     }
 
     /**
@@ -316,6 +323,17 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
         return schemaKeyspaceConfiguration;
     }
 
+    /**
+     * @return the configuration for cdc
+     */
+    @Override
+    @JsonProperty(value = CDC)
+    @Nullable
+    public CdcConfiguration cdcConfiguration()
+    {
+        return cdcConfiguration;
+    }
+
     public static Builder builder()
     {
         return new Builder();
@@ -343,6 +361,7 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
         protected JmxConfiguration jmxConfiguration = new 
JmxConfigurationImpl();
         protected TrafficShapingConfiguration trafficShapingConfiguration = 
new TrafficShapingConfigurationImpl();
         protected SchemaKeyspaceConfiguration schemaKeyspaceConfiguration = 
new SchemaKeyspaceConfigurationImpl();
+        protected CdcConfiguration cdcConfiguration = new 
CdcConfigurationImpl();
 
         private Builder()
         {
@@ -533,6 +552,18 @@ public class ServiceConfigurationImpl implements 
ServiceConfiguration
             return update(b -> b.schemaKeyspaceConfiguration = 
schemaKeyspaceConfiguration);
         }
 
+        /**
+         * Set the {@code cdcConfiguration} and returns a reference to this 
Builder enabling
+         * method chaining.
+         *
+         * @param configuration th {@code cdcConfiguration} to set
+         * @return a reference to the Builder
+         */
+        public Builder cdcConfiguration(CdcConfiguration configuration)
+        {
+            return update(b -> b.cdcConfiguration = configuration);
+        }
+
         /**
          * Returns a {@code ServiceConfigurationImpl} built from the 
parameters previously set.
          *
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandler.java
new file mode 100644
index 00000000..2f7a2540
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandler.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CdcUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.getLogFilePrefix;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isIndexFile;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.parseIndexFile;
+
+/**
+ * Provides REST endpoint for listing commit logs in CDC directory.
+ */
+@Singleton
+public class ListCdcDirHandler extends AbstractHandler<Void>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ListCdcDirHandler.class);
+    private final ServiceConfiguration config;
+    private final TaskExecutorPool serviceExecutorPool;
+
+    @Inject
+    public ListCdcDirHandler(InstanceMetadataFetcher metadataFetcher,
+                             SidecarConfiguration config,
+                             ExecutorPools executorPools,
+                             CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.config = config.serviceConfiguration();
+        this.serviceExecutorPool = executorPools.service();
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  Void request)
+    {
+        String cdcDir = metadataFetcher.instance(host).cdcDir();
+        serviceExecutorPool
+        .executeBlocking(() -> collectCdcSegmentsFromFileSystem(cdcDir))
+        .map(segments -> new ListCdcSegmentsResponse(config.host(), 
config.port(), segments))
+        .onSuccess(context::json)
+        .onFailure(cause -> {
+            LOGGER.warn("Error listing the CDC commit log segments", cause);
+            context.response()
+                   
.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code())
+                   .setStatusMessage(cause.getMessage())
+                   .end();
+        });
+    }
+
+    private List<CdcSegmentInfo> collectCdcSegmentsFromFileSystem(String 
cdcDirPath) throws IOException
+    {
+        List<CdcSegmentInfo> segmentInfos = new ArrayList<>();
+        File cdcDir = Paths.get(cdcDirPath).toAbsolutePath().toFile();
+        if (!cdcDir.isDirectory())
+        {
+            throw new IOException("CDC directory does not exist");
+        }
+
+        File[] cdcFiles = cdcDir.listFiles();
+        if (cdcFiles == null || cdcFiles.length == 0)
+        {
+            return segmentInfos;
+        }
+
+        Set<String> idxFileNamePrefixes = new HashSet<>();
+        for (File cdcFile : cdcFiles)
+        {
+            if (CdcUtil.matchIndexExtension(cdcFile.getName()))
+            {
+                
idxFileNamePrefixes.add(CdcUtil.getIdxFilePrefix(cdcFile.getName()));
+            }
+        }
+
+        for (File cdcFile : cdcFiles)
+        {
+            String fileName = cdcFile.getName();
+            BasicFileAttributes fileAttributes = 
Files.readAttributes(cdcFile.toPath(), BasicFileAttributes.class);
+            if (!cdcFile.exists()
+                || !fileAttributes.isRegularFile() // the file just gets 
deleted? ignore it
+                || isIndexFile(fileName) // ignore all .idx files
+                || !idxFileNamePrefixes.contains(getLogFilePrefix(fileName))) 
// ignore .log files found without matching .idx files
+            {
+                continue;
+            }
+
+            CdcUtil.CdcIndex cdcIndex = parseIndexFile(new File(cdcDirPath, 
getIdxFileName(fileName)), fileAttributes.size());
+            CdcSegmentInfo segmentInfo =
+            new CdcSegmentInfo(fileName, fileAttributes.size(),
+                               cdcIndex.latestFlushPosition, 
cdcIndex.isCompleted,
+                               fileAttributes.lastModifiedTime().toMillis());
+            segmentInfos.add(segmentInfo);
+        }
+        return segmentInfos;
+    }
+
+    @Override
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandler.java
new file mode 100644
index 00000000..0c184e77
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandler.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.cdc.CdcLogCache;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.routes.AbstractHandler;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+import static org.apache.cassandra.sidecar.utils.CdcUtil.getIdxFileName;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isLogFile;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.isValid;
+import static org.apache.cassandra.sidecar.utils.CdcUtil.parseIndexFile;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Provides REST endpoint for streaming cdc commit logs.
+ */
+@Singleton
+public class StreamCdcSegmentHandler extends AbstractHandler<String>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StreamCdcSegmentHandler.class);
+
+    private final FileStreamer fileStreamer;
+    private final CdcLogCache cdcLogCache;
+    private final TaskExecutorPool serviceExecutorPool;
+
+    @Inject
+    public StreamCdcSegmentHandler(InstanceMetadataFetcher metadataFetcher,
+                                   FileStreamer fileStreamer,
+                                   CdcLogCache cdcLogCache,
+                                   ExecutorPools executorPools,
+                                   CassandraInputValidator validator)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.fileStreamer = fileStreamer;
+        this.cdcLogCache = cdcLogCache;
+        this.serviceExecutorPool = executorPools.service();
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  String host,
+                                  SocketAddress remoteAddress,
+                                  String segment)
+    {
+        cdcLogCache.initMaybe();
+        InstanceMetadata instance = metadataFetcher.instance(host);
+        String cdcDir = instance.cdcDir();
+        File segmentFile = new File(cdcDir, segment);
+        validateCdcSegmentFile(segmentFile);
+
+        String indexFileName = getIdxFileName(segment);
+        File indexFile = new File(cdcDir, indexFileName);
+        HttpResponse response = new HttpResponse(context.request(), 
context.response());
+        streamCdcSegmentAsync(context, segmentFile, indexFile, response, 
instance)
+        // Touch the files at the end of the request
+        // If the file exists in cache, its expiry is extended; otherwise, the 
cache is not changed.
+        .onSuccess(res -> cdcLogCache.touch(segmentFile, indexFile))
+        .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, segment));
+    }
+
+    private Future<Void> streamCdcSegmentAsync(RoutingContext context,
+                                               File segmentFile,
+                                               File indexFile,
+                                               HttpResponse response,
+                                               InstanceMetadata instance)
+    {
+        long segmentFileLength = segmentFile.length();
+        return getOrCreateLinkedCdcFilePairAsync(segmentFile, indexFile)
+               .compose(cdcFilePair ->
+                        openCdcIndexFileAsync(cdcFilePair)
+                        .compose(cdcIndex -> {
+                            // stream the segment file; depending on whether 
the cdc segment is complete or not, cap the range of file to stream
+                            String rangeHeader = 
context.request().getHeader(HttpHeaderNames.RANGE);
+                            return cdcIndex.isCompleted
+                                   ? 
fileStreamer.parseRangeHeader(rangeHeader, segmentFileLength)
+                                   : 
fileStreamer.parseRangeHeader(rangeHeader, cdcIndex.latestFlushPosition);
+                        })
+                        .compose(range -> fileStreamer.stream(response, 
instance.id(), cdcFilePair.segmentFile.getAbsolutePath(), segmentFileLength, 
range))
+               );
+    }
+
+    @Override
+    protected String extractParamsOrThrow(RoutingContext context)
+    {
+        return context.request().getParam("segment");
+    }
+
+    private void validateCdcSegmentFile(File segmentFile) throws HttpException
+    {
+        // We only accept stream request for log files
+        if (!isValid(segmentFile.getName()) || 
!isLogFile(segmentFile.getName()))
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid 
path param for CDC segment: " + segmentFile.getName());
+        }
+
+        // check file existence
+        if (!segmentFile.exists())
+        {
+            throw wrapHttpException(HttpResponseStatus.NOT_FOUND, "CDC segment 
not found: " + segmentFile.getName());
+        }
+
+        // check file content
+        long fileSize = segmentFile.length();
+        if (fileSize == 0)
+        {
+            throw 
wrapHttpException(HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE, "File is 
empty");
+        }
+    }
+
+    private Future<CdcFilePair> getOrCreateLinkedCdcFilePairAsync(File 
segmentFile, File indexFile)
+    {
+        return serviceExecutorPool.executeBlocking(() -> {
+            // hardlink the segment and its index file,
+            // in order to guarantee the files exist in the subsequent calls 
(within the cache duration)
+            try
+            {
+                File linkedSegmentFile = 
cdcLogCache.createLinkedFileInCache(segmentFile);
+                File linkedIndexFile = 
cdcLogCache.createLinkedFileInCache(indexFile);
+                return new CdcFilePair(linkedSegmentFile, linkedIndexFile);
+            }
+            catch (IOException e)
+            {
+                LOGGER.debug("Failed to prepare CDC segment to stream", e);
+                throw 
wrapHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to prepare 
CDC segment to stream", e);
+            }
+        });
+    }
+
+    private Future<CdcUtil.CdcIndex> openCdcIndexFileAsync(CdcFilePair 
cdcFilePair)
+    {
+        return serviceExecutorPool.executeBlocking(() -> {
+            // check index file is accessible
+            try
+            {
+                return parseIndexFile(cdcFilePair.indexFile, 
cdcFilePair.segmentFile.length());
+            }
+            catch (IOException e)
+            {
+                LOGGER.error("Failed to read CDC index file", e);
+                throw 
wrapHttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Failed to read CDC 
index file", e);
+            }
+        });
+    }
+
+    private static class CdcFilePair
+    {
+        private final File segmentFile;
+        private final File indexFile;
+
+        private CdcFilePair(File segmentFile, File indexFile)
+        {
+            this.segmentFile = segmentFile;
+            this.indexFile = indexFile;
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 93b9f810..e135e6a1 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -106,6 +106,8 @@ import 
org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler;
 import org.apache.cassandra.sidecar.routes.TimeSkewHandler;
 import org.apache.cassandra.sidecar.routes.TokenRangeReplicaMapHandler;
 import org.apache.cassandra.sidecar.routes.cassandra.NodeSettingsHandler;
+import org.apache.cassandra.sidecar.routes.cdc.ListCdcDirHandler;
+import org.apache.cassandra.sidecar.routes.cdc.StreamCdcSegmentHandler;
 import org.apache.cassandra.sidecar.routes.restore.AbortRestoreJobHandler;
 import org.apache.cassandra.sidecar.routes.restore.CreateRestoreJobHandler;
 import org.apache.cassandra.sidecar.routes.restore.CreateRestoreSliceHandler;
@@ -255,6 +257,8 @@ public class MainModule extends AbstractModule
                               SSTableUploadHandler ssTableUploadHandler,
                               SSTableImportHandler ssTableImportHandler,
                               SSTableCleanupHandler ssTableCleanupHandler,
+                              StreamCdcSegmentHandler streamCdcSegmentHandler,
+                              ListCdcDirHandler listCdcDirHandler,
                               RestoreRequestValidationHandler 
validateRestoreJobRequest,
                               DiskSpaceProtectionHandler diskSpaceProtection,
                               ValidateTableExistenceHandler 
validateTableExistence,
@@ -417,6 +421,12 @@ public class MainModule extends AbstractModule
               .handler(validateRestoreJobRequest)
               .handler(restoreJobProgressHandler);
 
+        // CDC APIs
+        router.get(ApiEndpointsV1.LIST_CDC_SEGMENTS_ROUTE)
+              .handler(listCdcDirHandler);
+        router.get(ApiEndpointsV1.STREAM_CDC_SEGMENTS_ROUTE)
+              .handler(streamCdcSegmentHandler);
+
         return router;
     }
 
@@ -700,6 +710,7 @@ public class MainModule extends AbstractModule
                                    .port(port)
                                    .dataDirs(cassandraInstance.dataDirs())
                                    .stagingDir(cassandraInstance.stagingDir())
+                                   .cdcDir(cassandraInstance.cdcDir())
                                    .delegate(delegate)
                                    .metricRegistry(instanceSpecificRegistry)
                                    .build();
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
new file mode 100644
index 00000000..91aa3029
--- /dev/null
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Class with utility methods for CDC.
+ */
+public final class CdcUtil
+{
+    private static final String SEPARATOR = "-";
+    private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR;
+    private static final String LOG_FILE_EXTENSION = ".log";
+    private static final String IDX_FILE_EXTENSION = "_cdc.idx";
+    private static final int LOG_FILE_EXTENSION_LENGTH = 
LOG_FILE_EXTENSION.length();
+    private static final int IDX_FILE_EXTENSION_LENGTH = 
IDX_FILE_EXTENSION.length();
+    private static final String LOG_FILE_COMPLETE_INDICATOR = "COMPLETED";
+    private static final String FILENAME_EXTENSION = "(" + IDX_FILE_EXTENSION 
+ "|" + LOG_FILE_EXTENSION + ")";
+    private static final Pattern SEGMENT_PATTERN = 
Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + 
FILENAME_EXTENSION);
+
+    private static final int READ_INDEX_FILE_MAX_RETRY = 5;
+
+    private CdcUtil()
+    {
+        throw new UnsupportedOperationException("Do not instantiate.");
+    }
+
+    public static String getIdxFilePrefix(String idxFileName)
+    {
+        return idxFileName.substring(0, idxFileName.length() - 
IDX_FILE_EXTENSION_LENGTH);
+    }
+
+    public static String getLogFilePrefix(String logFileName)
+    {
+        return logFileName.substring(0, logFileName.length() - 
LOG_FILE_EXTENSION_LENGTH);
+    }
+
+    public static String getIdxFileName(String logFileName)
+    {
+        return logFileName.replace(LOG_FILE_EXTENSION, IDX_FILE_EXTENSION);
+    }
+
+    public static File getIdxFile(File logFile)
+    {
+        return new File(logFile.getParent(), 
getIdxFileName(logFile.getName()));
+    }
+
+    public static String getLogFileName(String indexFileName)
+    {
+        return indexFileName.replace(IDX_FILE_EXTENSION, LOG_FILE_EXTENSION);
+    }
+
+    public static CdcIndex parseIndexFile(File indexFile, long 
segmentFileLength) throws IOException
+    {
+        List<String> lines = null;
+        // For an index file, if it exists, it should have non-empty content.
+        // Therefore, the lines read from the file should not be empty.
+        // If it is empty, retry reading the file. The cause is the contention 
between the reader and the index file writer.
+        // In most case, the loop should only run once.
+        for (int i = 0; i < READ_INDEX_FILE_MAX_RETRY; i++)
+        {
+            try
+            {
+                lines = Files.readAllLines(indexFile.toPath());
+                if (!lines.isEmpty())
+                    break;
+            }
+            catch (IOException e)
+            {
+                throw new IOException("Unable to parse the CDC segment index 
file " + indexFile.getName(), e);
+            }
+        }
+        if (lines.isEmpty())
+        {
+            throw new IOException("Unable to read anything from the CDC 
segment index file " + indexFile.getName());
+        }
+
+        final String lastLine = lines.get(lines.size() - 1);
+        final boolean isCompleted = 
lastLine.equals(LOG_FILE_COMPLETE_INDICATOR);
+        final long latestPosition = isCompleted ? segmentFileLength : 
Long.parseLong(lastLine);
+        return new CdcIndex(latestPosition, isCompleted);
+    }
+
+    /**
+     * Class representing Cdc index
+     */
+    public static class CdcIndex
+    {
+        public final long latestFlushPosition;
+        public final boolean isCompleted;
+
+        public CdcIndex(long latestFlushPosition, boolean isCompleted)
+        {
+            this.latestFlushPosition = latestFlushPosition;
+            this.isCompleted = isCompleted;
+        }
+    }
+
+    /**
+     * Validate for the cdc (log or index) file name.see {@link 
SEGMENT_PATTERN} for the format
+     * @param fileName name of the file
+     * @return true if the name is valid; otherwise, false
+     */
+    public static boolean isValid(String fileName)
+    {
+        return SEGMENT_PATTERN.matcher(fileName).matches();
+    }
+
+    public static boolean isLogFile(String fileName)
+    {
+        return isValid(fileName) && fileName.endsWith(LOG_FILE_EXTENSION);
+    }
+
+    public static boolean isIndexFile(String fileName)
+    {
+        return isValid(fileName) && matchIndexExtension(fileName);
+    }
+
+    public static boolean matchIndexExtension(String fileName)
+    {
+        return fileName.endsWith(IDX_FILE_EXTENSION);
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
index 1825863d..10425e17 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.sidecar.models.HttpResponse;
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static 
io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
 import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
 import static 
org.apache.cassandra.sidecar.utils.MetricUtils.parseSSTableComponent;
 
 /**
@@ -219,7 +220,7 @@ public class FileStreamer
      * @param fileLength  The length of the file
      * @return a succeeded future when the parsing is successful, a failed 
future when the range parsing fails
      */
-    private Future<HttpRange> parseRangeHeader(String rangeHeader, long 
fileLength)
+    public Future<HttpRange> parseRangeHeader(String rangeHeader, long 
fileLength)
     {
         HttpRange fr = HttpRange.of(0, fileLength - 1);
         if (rangeHeader == null)
@@ -236,7 +237,7 @@ public class FileStreamer
         catch (IllegalArgumentException | RangeException | 
UnsupportedOperationException e)
         {
             LOGGER.error("Failed to parse header '{}'", rangeHeader, e);
-            return Future.failedFuture(new 
HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code()));
+            return 
Future.failedFuture(wrapHttpException(REQUESTED_RANGE_NOT_SATISFIABLE, 
e.getMessage(), e));
         }
     }
 }
diff --git a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java 
b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 4b19a3c5..48e76c3b 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -42,6 +42,7 @@ import 
org.apache.cassandra.sidecar.common.response.NodeSettings;
 import org.apache.cassandra.sidecar.common.server.StorageOperations;
 import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
 import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.config.CdcConfiguration;
 import org.apache.cassandra.sidecar.config.HealthCheckConfiguration;
 import org.apache.cassandra.sidecar.config.RestoreJobConfiguration;
 import org.apache.cassandra.sidecar.config.SSTableUploadConfiguration;
@@ -51,6 +52,7 @@ import 
org.apache.cassandra.sidecar.config.SidecarConfiguration;
 import org.apache.cassandra.sidecar.config.SslConfiguration;
 import org.apache.cassandra.sidecar.config.ThrottleConfiguration;
 import org.apache.cassandra.sidecar.config.yaml.AccessControlConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.HealthCheckConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.RestoreJobConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SSTableUploadConfigurationImpl;
@@ -114,6 +116,7 @@ public class TestModule extends AbstractModule
     {
         ThrottleConfiguration throttleConfiguration = new 
ThrottleConfigurationImpl(5, 5);
         SSTableUploadConfiguration uploadConfiguration = new 
SSTableUploadConfigurationImpl(0F);
+        CdcConfiguration cdcConfiguration = new CdcConfigurationImpl(1L);
         SchemaKeyspaceConfiguration schemaKeyspaceConfiguration =
         SchemaKeyspaceConfigurationImpl.builder()
                                        .isEnabled(true)
@@ -126,6 +129,7 @@ public class TestModule extends AbstractModule
                                 .throttleConfiguration(throttleConfiguration)
                                 
.schemaKeyspaceConfiguration(schemaKeyspaceConfiguration)
                                 
.sstableUploadConfiguration(uploadConfiguration)
+                                .cdcConfiguration(cdcConfiguration)
                                 .build();
         RestoreJobConfiguration restoreJobConfiguration =
         RestoreJobConfigurationImpl.builder()
@@ -167,16 +171,19 @@ public class TestModule extends AbstractModule
                                                   1,
                                                   
"src/test/resources/instance1/data",
                                                   
"src/test/resources/instance1/sstable-staging",
+                                                  
"src/test/resources/instance1/cdc_raw",
                                                   true);
         InstanceMetadata instance2 = mockInstance("localhost2",
                                                   2,
                                                   
"src/test/resources/instance2/data",
                                                   
"src/test/resources/instance2/sstable-staging",
+                                                  
"src/test/resources/instance2/cdc_raw",
                                                   false);
         InstanceMetadata instance3 = mockInstance("localhost3",
                                                   3,
                                                   
"src/test/resources/instance3/data",
                                                   
"src/test/resources/instance3/sstable-staging",
+                                                  
"src/test/resources/instance3/cdc_raw",
                                                   true);
         final List<InstanceMetadata> instanceMetas = new ArrayList<>();
         instanceMetas.add(instance1);
@@ -185,13 +192,14 @@ public class TestModule extends AbstractModule
         return instanceMetas;
     }
 
-    private InstanceMetadata mockInstance(String host, int id, String dataDir, 
String stagingDir, boolean isUp)
+    private InstanceMetadata mockInstance(String host, int id, String dataDir, 
String stagingDir, String cdcDir, boolean isUp)
     {
         InstanceMetadata instanceMeta = mock(InstanceMetadata.class);
         when(instanceMeta.id()).thenReturn(id);
         when(instanceMeta.host()).thenReturn(host);
         when(instanceMeta.port()).thenReturn(6475);
         when(instanceMeta.stagingDir()).thenReturn(stagingDir);
+        when(instanceMeta.cdcDir()).thenReturn(cdcDir);
         List<String> dataDirectories = Collections.singletonList(dataDir);
         when(instanceMeta.dataDirs()).thenReturn(dataDirectories);
         when(instanceMeta.metrics()).thenReturn(new 
InstanceMetricsImpl(registry(id)));
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java
new file mode 100644
index 00000000..05875b68
--- /dev/null
+++ b/server/src/test/java/org/apache/cassandra/sidecar/cdc/CDCLogCacheTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import com.google.common.base.Preconditions;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.ExecutorPoolsHelper;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.server.MainModule;
+
+import static org.apache.cassandra.sidecar.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class CdcLogCacheTest
+{
+    private final Injector injector = 
Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
+    private final InstancesConfig instancesConfig = 
injector.getInstance(InstancesConfig.class);
+    private final CdcLogCache logCache = cdcLogCache();
+
+    @BeforeEach
+    void beforeEach()
+    {
+        logCache.initMaybe();
+        logCache.hardlinkCache.invalidateAll();
+        assertThat(logCache.hardlinkCache.size()).isZero();
+    }
+
+    @Test
+    void testLinkedFileExpiryInCache() throws IOException
+    {
+        File commitLogFile = instance1CommitLogFile();
+        File linkedCommitLog = logCache.createLinkedFileInCache(commitLogFile);
+        assertThat(linkedCommitLog).isNotNull();
+        assertThat(logCache.hardlinkCache.size()).isOne();
+
+        // wait for file to expire
+        loopAssert(2, () -> 
assertThat(logCache.hardlinkCache.size()).isZero());
+    }
+
+    @Test
+    void testCleanUpLinkedFiles() throws IOException
+    {
+        File commitLogFile = instance1CommitLogFile();
+        File linkedFile = logCache.createLinkedFileInCache(commitLogFile);
+        assertThat(linkedFile.exists()).isTrue();
+
+        // Verify that cleanup deletes the linked file
+        logCache.cleanupLinkedFilesOnStartup(instancesConfig);
+        assertThat(linkedFile.exists()).isFalse();
+    }
+
+    @Test
+    void testCreateLinkedFileInCache() throws IOException
+    {
+        File commitLogFile = instance1CommitLogFile();
+        File linkedCommitLog = logCache.createLinkedFileInCache(commitLogFile);
+
+        // Check if hard link is created
+        assertThat(commitLogFile).isNotEqualTo(linkedCommitLog);
+        assertThat(Files.isSameFile(commitLogFile.toPath(), 
linkedCommitLog.toPath())).isTrue();
+
+        // Should return cached linked file in subsequent calls
+        
assertThat(linkedCommitLog).isEqualTo(logCache.createLinkedFileInCache(commitLogFile));
+    }
+
+    /**
+     * Failing to clean up shouldn't fail to initialize the class
+     */
+    @Test
+    void testCleanupErrorDoesntPreventInitialization()
+    {
+        assertThatNoException().isThrownBy(() -> {
+            new 
FailingCdcLogCache(ExecutorPoolsHelper.createdSharedTestPool(Vertx.vertx()), 
instancesConfig, sidecarConfiguration());
+        });
+    }
+
+    private File instance1CommitLogFile()
+    {
+        String commitLogPathOnInstance1 = 
instancesConfig.instances().get(0).cdcDir() + "/CommitLog-1-1.log";
+        return new File(commitLogPathOnInstance1);
+    }
+
+    private CdcLogCache cdcLogCache()
+    {
+        ExecutorPools executorPools = 
ExecutorPoolsHelper.createdSharedTestPool(Vertx.vertx());
+        return new CdcLogCache(executorPools, instancesConfig, 100L);
+    }
+
+    private SidecarConfiguration sidecarConfiguration()
+    {
+        SidecarConfiguration sidecarConfiguration = 
mock(SidecarConfiguration.class, RETURNS_DEEP_STUBS);
+        
when(sidecarConfiguration.serviceConfiguration().cdcConfiguration().segmentHardlinkCacheExpiryInSecs()).thenReturn(1L);
+        return sidecarConfiguration;
+    }
+
+    static class FailingCdcLogCache extends CdcLogCache
+    {
+        public FailingCdcLogCache(ExecutorPools executorPools, InstancesConfig 
cassandraConfig, SidecarConfiguration sidecarConfig)
+        {
+            super(executorPools, cassandraConfig, sidecarConfig);
+        }
+
+        @Override
+        public void cleanupLinkedFilesOnStartup(InstancesConfig config)
+        {
+            // Fake an error to simulate the initialization issue
+            Preconditions.checkState(false, "cdc_raw_tmp should be a 
directory");
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
index a9fd5505..362219a9 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
@@ -304,6 +304,15 @@ class SidecarConfigurationTest
         assertThat(vertxFsOptions.classpathResolvingEnabled()).isTrue();
     }
 
+    @Test
+    void testCdcCofiguration() throws IOException
+    {
+        Path yamlPath = yaml("config/sidecar_cdc.yaml");
+        SidecarConfigurationImpl sidecarConfiguration = 
SidecarConfigurationImpl.readYamlConfiguration(yamlPath);
+        assertThat(sidecarConfiguration).isNotNull();
+        
assertThat(sidecarConfiguration.serviceConfiguration().cdcConfiguration().segmentHardlinkCacheExpiryInSecs()).isEqualTo(60L);
+    }
+
     @Test
     void testAccessControlConfiguration() throws Exception
     {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandlerTest.java
new file mode 100644
index 00000000..ac206275
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/ListCdcDirHandlerTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.common.response.ListCdcSegmentsResponse;
+import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test class for testing ListCdcDirHandler class.
+ */
+@ExtendWith(VertxExtension.class)
+class ListCdcDirHandlerTest
+{
+    static final Logger LOGGER = 
LoggerFactory.getLogger(ListCdcDirHandlerTest.class);
+    private static final String ROUTE = "/api/v1/cdc/segments";
+    private Vertx vertx;
+    private Server server;
+
+    @BeforeEach
+    public void setUp() throws InterruptedException, IOException
+    {
+        Module testOverride = new TestModule();
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                .with(testOverride));
+        server = injector.getInstance(Server.class);
+        vertx = injector.getInstance(Vertx.class);
+
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+                .onSuccess(s -> context.completeNow())
+                .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testRouteSucceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        client.get(server.actualPort(), "localhost", ROUTE)
+              .as(BodyCodec.buffer())
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      ListCdcSegmentsResponse listCDCSegmentsResponse = 
resp.bodyAsJson(ListCdcSegmentsResponse.class);
+                      
assertThat(listCDCSegmentsResponse.segmentInfos().size()).isEqualTo(2);
+                      for (CdcSegmentInfo segmentInfo : 
listCDCSegmentsResponse.segmentInfos())
+                      {
+                          if (segmentInfo.name.equals("CommitLog-1-1.log"))
+                          {
+                              
assertThat(segmentInfo.name).isEqualTo("CommitLog-1-1.log");
+                              assertThat(segmentInfo.idx).isEqualTo(1);
+                              assertThat(segmentInfo.size).isEqualTo(1);
+                              assertThat(segmentInfo.completed).isTrue();
+                          }
+                      }
+                  });
+                  client.close();
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testLogFilesWithoutIdxFilesNotStreamed(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        client.get(server.actualPort(), "localhost", ROUTE)
+              .as(BodyCodec.buffer())
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      ListCdcSegmentsResponse listCDCSegmentsResponse = 
resp.bodyAsJson(ListCdcSegmentsResponse.class);
+                      for (CdcSegmentInfo segmentInfo : 
listCDCSegmentsResponse.segmentInfos())
+                      {
+                          
assertThat(segmentInfo.name).isNotEqualTo("CommitLog-1-3.log");
+                      }
+                  });
+                  client.close();
+                  context.completeNow();
+              }));
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandlerTest.java
new file mode 100644
index 00000000..5f3a1760
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/cdc/StreamCdcSegmentHandlerTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes.cdc;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cdc.CdcLogCache;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/**
+ * Test class for StreamCDCSegmentHandler.
+ */
+@ExtendWith(VertxExtension.class)
+class StreamCdcSegmentHandlerTest
+{
+    public static final String CDC_RAW_DIR = 
"./src/test/resources/instance1/cdc_raw";
+    public static final String CDC_RAW_TEMP_DIR = CDC_RAW_DIR + 
CdcLogCache.TEMP_DIR_SUFFIX;
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamCdcSegmentHandlerTest.class);
+    private Vertx vertx;
+    private Server server;
+
+    @BeforeEach
+    void setUp() throws InterruptedException, IOException
+    {
+        Module testOverride = new TestModule();
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        server = injector.getInstance(Server.class);
+        vertx = injector.getInstance(Vertx.class);
+
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testRouteSucceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-1.log";
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .as(BodyCodec.buffer())
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> 
assertThat(resp.bodyAsString()).isEqualTo("x"));
+                  client.close();
+                  context.completeNow();
+              }));
+        spinAssertCdcRawTempEmpty();
+    }
+
+    @Test  // A variant of testRouteSucceeds. It sends concurrent requests for 
the same segment and assert the all requests should be satisfied.
+    void testConcurrentRequestsForSegmentSucceeds(VertxTestContext context)
+    {
+        int requests = 5;
+        WebClient client = WebClient.create(vertx);
+        CountDownLatch latch = new CountDownLatch(requests);
+        String testRoute = "/cdc/segments/CommitLog-1-1.log";
+        for (int i = 0; i < requests; i++)
+        {
+            client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+                  .as(BodyCodec.buffer())
+                  .expect(ResponsePredicate.SC_OK)
+                  .send(context.succeeding(resp -> {
+                      context.verify(() -> 
assertThat(resp.bodyAsString()).isEqualTo("x"));
+                      latch.countDown();
+                      if (latch.getCount() == 0)
+                      {
+                          context.completeNow();
+                      }
+                  }));
+        }
+        spinAssertCdcRawTempEmpty();
+    }
+
+    @Test // The server internal should re-create hardlinks for each request 
in this scenario.
+    void testSequentialRequestsForSegmentSucceeds(VertxTestContext context)
+    {
+        for (int i = 0; i < 3; i++)
+        {
+            testRouteSucceeds(context);
+        }
+    }
+
+    @Test
+    void testSegmentNotFoundSucceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-123456.log";
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .expect(ResponsePredicate.SC_NOT_FOUND)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      assertThat(resp.statusCode()).isEqualTo(404);
+                      assertThat(resp.statusMessage()).isEqualTo("Not Found");
+                      assertThat(resp.bodyAsJsonObject().getString("message"))
+                      .isEqualTo("CDC segment not found: 
CommitLog-1-123456.log");
+                  });
+                  client.close();
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testIncorrectSegmentExtensionSucceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-1"; // not a valid file 
name, missing `.log` extension
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .expect(ResponsePredicate.SC_BAD_REQUEST)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      assertThat(resp.statusCode()).isEqualTo(400);
+                      assertThat(resp.statusMessage()).isEqualTo("Bad 
Request");
+                      assertThat(resp.bodyAsJsonObject().getString("message"))
+                      .isEqualTo("Invalid path param for CDC segment: 
CommitLog-1-1");
+                  });
+                  context.completeNow();
+                  client.close();
+              }));
+    }
+
+    @Test
+    void testInvalidRangeFails(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-1.log";
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .putHeader("Range", "bytes=4-3")
+              .expect(ResponsePredicate.SC_REQUESTED_RANGE_NOT_SATISFIABLE)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      assertThat(resp.statusCode()).isEqualTo(416);
+                      assertThat(resp.statusMessage()).isEqualTo("Requested 
Range Not Satisfiable");
+                      assertThat(resp.bodyAsJsonObject().getString("message"))
+                      .isEqualTo("Range does not satisfy boundary 
requirements. range=[4, 3]");
+                  });
+                  client.close();
+                  context.completeNow();
+              }));
+        spinAssertCdcRawTempEmpty();
+    }
+
+    @Test
+    void testPartialRangeStreamedSucceeds(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-2.log";
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .putHeader("Range", "bytes=0-2")
+              .as(BodyCodec.buffer())
+              .expect(ResponsePredicate.SC_PARTIAL_CONTENT)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      
assertThat(resp.getHeader(HttpHeaderNames.CONTENT_LENGTH.toString()))
+                      .withFailMessage("It should only stream to the last 
flushed position: 1")
+                      .isEqualTo("1");
+                      // see src/test/resources/cdc_raw/CommitLog-1-2_cdc.idx
+                      
assertThat(resp.getHeader(HttpHeaderNames.CONTENT_RANGE.toString()))
+                      .withFailMessage("It should only stream to the last 
flushed position: 1")
+                      .isEqualTo("bytes 0-0/1");
+                      assertThat(resp.bodyAsString()).isEqualTo("x");
+                  });
+                  client.close();
+                  context.completeNow();
+              }));
+        spinAssertCdcRawTempEmpty();
+    }
+
+    @Test
+    void 
testPartialRangeStreamedForIncompleteLogFileWithoutRangeHeader(VertxTestContext 
context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-2.log";
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .as(BodyCodec.buffer())
+              .expect(ResponsePredicate.SC_PARTIAL_CONTENT)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> 
assertThat(resp.bodyAsString()).isEqualTo("x"));
+                  client.close();
+                  context.completeNow();
+              }));
+        spinAssertCdcRawTempEmpty();
+    }
+
+    @Test
+    void testIdxFileIsNotStreamed(VertxTestContext context)
+    {
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/cdc/segments/CommitLog-1-2_cdc.idx";
+        client.get(server.actualPort(), "localhost", "/api/v1" + testRoute)
+              .as(BodyCodec.buffer())
+              .expect(ResponsePredicate.SC_BAD_REQUEST)
+              .send(context.succeeding(resp -> {
+                  context.verify(() -> {
+                      assertThat(resp.statusCode()).isEqualTo(400);
+                      assertThat(resp.statusMessage()).isEqualTo("Bad 
Request");
+                      assertThat(resp.bodyAsJsonObject().getString("message"))
+                      .isEqualTo("Invalid path param for CDC segment: 
CommitLog-1-2_cdc.idx");
+                  });
+                  client.close();
+                  context.completeNow();
+              }));
+    }
+
+    private void spinAssertCdcRawTempEmpty()
+    {
+        File cdcTempDir = new File(CDC_RAW_TEMP_DIR);
+        assertThat(cdcTempDir.exists()).isTrue();
+        int attempts = 10;
+        while (attempts > 0)
+        {
+            File[] files = cdcTempDir.listFiles();
+            assertThat(files).isNotNull();
+            if (files.length > 0)
+            {
+                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+                attempts--;
+                if (attempts == 0)
+                {
+                    assertThat(files.length)
+                    .withFailMessage("Expect empty directory. But found those 
files: " + Arrays.toString(files))
+                    .isEqualTo(0);
+                }
+            }
+            else
+            {
+                break;
+            }
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
index 0ade5dc9..a91b7cb2 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/restore/RestoreJobProgressHandlerTest.java
@@ -29,7 +29,6 @@ import java.util.stream.IntStream;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.vertx.core.AsyncResult;
 import io.vertx.core.buffer.Buffer;
diff --git a/server/src/test/resources/config/sidecar_cdc.yaml 
b/server/src/test/resources/config/sidecar_cdc.yaml
new file mode 100644
index 00000000..5116049b
--- /dev/null
+++ b/server/src/test/resources/config/sidecar_cdc.yaml
@@ -0,0 +1,73 @@
+#
+# Cassandra SideCar configuration file
+#
+cassandra:
+  host: localhost
+  port: 9042
+  username: cassandra
+  password: cassandra
+  data_dirs:
+    - /ccm/test/node1/data0
+    - /ccm/test/node1/data1
+  staging_dir: /ccm/test/node1/sstable-staging
+  jmx_host: 127.0.0.1
+  jmx_port: 7199
+  jmx_role: controlRole
+  jmx_role_password: controlPassword
+  jmx_ssl_enabled: true
+
+sidecar:
+  host: 0.0.0.0
+  port: 0 # bind sever to the first available port
+  request_idle_timeout_millis: 300000 # this field expects integer value
+  request_timeout_millis: 300000
+  tcp_keep_alive: false
+  accept_backlog: 1024
+  server_verticle_instances: 2
+  throttle:
+    stream_requests_per_sec: 5000
+    timeout_sec: 10
+  traffic_shaping:
+    inbound_global_bandwidth_bps: 500
+    outbound_global_bandwidth_bps: 1500
+    peak_outbound_global_bandwidth_bps: 2000
+    max_delay_to_wait_millis: 2500
+    check_interval_for_stats_millis: 3000
+  sstable_upload:
+    concurrent_upload_limit: 80
+    min_free_space_percent: 10
+  allowable_time_skew_in_minutes: 60
+  sstable_import:
+    poll_interval_millis: 100
+    cache:
+      expire_after_access_millis: 7200000 # 2 hours
+      maximum_size: 10000
+  sstable_snapshot:
+    snapshot_list_cache:
+      expire_after_access_millis: 350
+      maximum_size: 450
+  worker_pools:
+    service:
+      name: "sidecar-worker-pool"
+      size: 20
+      max_execution_time_millis: 60000 # 60 seconds
+    internal:
+      name: "sidecar-internal-worker-pool"
+      size: 20
+      max_execution_time_millis: 900000 # 15 minutes
+  cdc:
+    segment_hardlink_cache_expiry_in_secs: 60 # 1 mins
+  jmx:
+    max_retries: 42
+    retry_delay_millis: 1234
+  schema:
+    is_enabled: false
+    keyspace: sidecar_internal
+    replication_strategy: SimpleStrategy
+    replication_factor: 1
+
+vertx:
+  filesystem_options:
+    classpath_resolving_enabled: true
+    file_cache_dir: /path/to/vertx/cache
+    file_caching_enabled: true
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1.log 
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1.log
new file mode 100644
index 00000000..c1b0730e
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1.log
@@ -0,0 +1 @@
+x
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1_cdc.idx 
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1_cdc.idx
new file mode 100644
index 00000000..70edf50b
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-1_cdc.idx
@@ -0,0 +1,2 @@
+1
+COMPLETED
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2.log 
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2.log
new file mode 100644
index 00000000..5ee608e7
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2.log
@@ -0,0 +1 @@
+xxxx
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2_cdc.idx 
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2_cdc.idx
new file mode 100644
index 00000000..56a6051c
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-2_cdc.idx
@@ -0,0 +1 @@
+1
\ No newline at end of file
diff --git a/server/src/test/resources/instance1/cdc_raw/CommitLog-1-3.log 
b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-3.log
new file mode 100644
index 00000000..ac8522fb
--- /dev/null
+++ b/server/src/test/resources/instance1/cdc_raw/CommitLog-1-3.log
@@ -0,0 +1 @@
+xxx
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to