This is an automated email from the ASF dual-hosted git repository.
ycai pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git
The following commit(s) were added to refs/heads/trunk by this push:
new 458a363 CASSANDRA-19778: Split out BufferingInputStream stats into
separate i… (#66)
458a363 is described below
commit 458a3630f882ae2b2a9cee272cf85ca7ff42f5cd
Author: jberragan <[email protected]>
AuthorDate: Wed Jul 17 14:29:21 2024 -0700
CASSANDRA-19778: Split out BufferingInputStream stats into separate i… (#66)
Split BufferingInputStream stats into separate interface so class level
generics are not required for the Stats interface
Patch by James Berragan; Reviewed by Bernardo Botella, Francisco Guerrero,
Yifan Cai for CASSANDRA-19778
---
CHANGES.txt | 1 +
.../cassandra/spark/data/FileSystemSSTable.java | 6 +-
.../spark/stats/BufferingInputStreamStats.java | 131 +++++++++++++++++++++
.../org/apache/cassandra/spark/stats/IStats.java | 59 ----------
.../utils/streaming/BufferingInputStream.java | 8 +-
.../spark/bulkwriter/blobupload/SSTableLister.java | 2 +-
.../cassandra/spark/data/LocalDataLayer.java | 2 +-
.../spark/data/SidecarProvisionedSSTable.java | 2 +-
.../org/apache/cassandra/spark/EndToEndTests.java | 21 ++--
.../spark/utils/BufferingInputStreamHttpTest.java | 2 +-
.../spark/utils/BufferingInputStreamTests.java | 10 +-
.../org/apache/cassandra/spark/stats/Stats.java | 104 ++--------------
.../cassandra/spark/reader/SSTableReaderTests.java | 4 +-
13 files changed, 171 insertions(+), 181 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 01181ec..d9231b6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Split out BufferingInputStream stats into separate interface
(CASSANDRA-19778)
* Bump Sidecar version to 55a9efee (CASSANDRA-19774)
* Add new module cassandra-analytics-common to store common code with minimal
dependencies (CASSANDRA-19748)
* Bulk writer fails validation stage when writing to a cluster using
RandomPartitioner (CASSANDRA-19727)
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
index ef41dc2..cffbc9a 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/FileSystemSSTable.java
@@ -30,7 +30,7 @@ import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.spark.stats.IStats;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.utils.IOUtils;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.apache.cassandra.spark.utils.streaming.BufferingInputStream;
@@ -44,9 +44,9 @@ public class FileSystemSSTable extends SSTable
private final transient Path dataFilePath;
private final transient boolean useBufferingInputStream;
- private final transient Supplier<IStats<SSTable>> stats;
+ private final transient Supplier<BufferingInputStreamStats<SSTable>> stats;
- public FileSystemSSTable(@NotNull Path dataFilePath, boolean
useBufferingInputStream, @NotNull Supplier<IStats<SSTable>> stats)
+ public FileSystemSSTable(@NotNull Path dataFilePath, boolean
useBufferingInputStream, @NotNull Supplier<BufferingInputStreamStats<SSTable>>
stats)
{
this.dataFilePath = dataFilePath;
this.useBufferingInputStream = useBufferingInputStream;
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java
new file mode 100644
index 0000000..dbd5019
--- /dev/null
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/BufferingInputStreamStats.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.stats;
+
+import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.utils.streaming.CassandraFile;
+import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+
+/**
+ * Stats for {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}.
+ * @param <T>
+ */
+public interface BufferingInputStreamStats<T extends CassandraFile>
+{
+ static <T extends CassandraFile> BufferingInputStreamStats<T>
doNothingStats()
+ {
+ return new BufferingInputStreamStats<T>()
+ {
+ };
+ }
+
+ /**
+ * When {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} queue is full,
usually indicating
+ * job is CPU-bound and blocked on the CompactionIterator
+ *
+ * @param ssTable the SSTable source for this input stream
+ */
+ default void inputStreamQueueFull(CassandraFileSource<? extends SSTable>
ssTable)
+ {
+ }
+
+ /**
+ * Failure occurred in the {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
+ *
+ * @param ssTable the SSTable source for this input stream
+ * @param throwable throwable
+ */
+ default void inputStreamFailure(CassandraFileSource<T> ssTable, Throwable
throwable)
+ {
+ }
+
+ /**
+ * Time the {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} spent blocking
on queue
+ * waiting for bytes. High time spent blocking indicates the job is
network-bound, or blocked on the
+ * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
to supply the bytes.
+ *
+ * @param ssTable the SSTable source for this input stream
+ * @param nanos time in nanoseconds
+ */
+ default void inputStreamTimeBlocked(CassandraFileSource<T> ssTable, long
nanos)
+ {
+ }
+
+ /**
+ * Bytes written to {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
+ * by the {@link
org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
+ *
+ * @param ssTable the SSTable source for this input stream
+ * @param length number of bytes written
+ */
+ default void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int
length)
+ {
+ }
+
+ /**
+ * Bytes read from {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
+ *
+ * @param ssTable the SSTable source for this input stream
+ * @param length number of bytes read
+ * @param queueSize current queue size
+ * @param percentComplete % completion
+ */
+ default void inputStreamByteRead(CassandraFileSource<T> ssTable,
+ int length,
+ int queueSize,
+ int percentComplete)
+ {
+ }
+
+ /**
+ * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
has finished writing
+ * to {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} after reaching
expected file length
+ *
+ * @param ssTable the SSTable source for this input stream
+ */
+ default void inputStreamEndBuffer(CassandraFileSource<T> ssTable)
+ {
+ }
+
+ /**
+ * {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
finished and closed
+ *
+ * @param ssTable the SSTable source for this input stream
+ * @param runTimeNanos total time open in nanoseconds
+ * @param totalNanosBlocked total time blocked on queue waiting for bytes
in nanoseconds
+ */
+ default void inputStreamEnd(CassandraFileSource<T> ssTable, long
runTimeNanos, long totalNanosBlocked)
+ {
+ }
+
+ /**
+ * Called when the InputStream skips bytes
+ *
+ * @param ssTable the SSTable source for this input stream
+ * @param bufferedSkipped the number of bytes already buffered in memory
skipped
+ * @param rangeSkipped the number of bytes skipped
+ * by efficiently incrementing the start range for
the next request
+ */
+ default void inputStreamBytesSkipped(CassandraFileSource<T> ssTable,
+ long bufferedSkipped,
+ long rangeSkipped)
+ {
+ }
+}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java
deleted file mode 100644
index 2d783f5..0000000
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/stats/IStats.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.cassandra.spark.stats;
-
-import org.apache.cassandra.spark.utils.streaming.CassandraFile;
-import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
-
-/**
- * Generic Stats interface that works across all CassandraFile FileTypes.
- *
- * @param <T>
- */
-public interface IStats<T extends CassandraFile>
-{
- default void inputStreamEnd(CassandraFileSource<T> source, long
runTimeNanos, long totalNanosBlocked)
- {
- }
-
- default void inputStreamEndBuffer(CassandraFileSource<T> ssTable)
- {
- }
-
- default void inputStreamTimeBlocked(CassandraFileSource<T> source, long
nanos)
- {
- }
-
- default void inputStreamByteRead(CassandraFileSource<T> source, int len,
int queueSize, int percentComplete)
- {
- }
-
- default void inputStreamFailure(CassandraFileSource<T> source, Throwable t)
- {
- }
-
- default void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int
len)
- {
- }
-
- default void inputStreamBytesSkipped(CassandraFileSource<T> source, long
bufferedSkipped, long rangeSkipped)
- {
- }
-}
diff --git
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
index 20f54c3..2e5f075 100644
---
a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
+++
b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/utils/streaming/BufferingInputStream.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.cassandra.spark.stats.IStats;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.utils.ThrowableUtils;
import org.jetbrains.annotations.NotNull;
@@ -68,7 +68,7 @@ public class BufferingInputStream<T extends CassandraFile>
extends InputStream i
private final BlockingQueue<StreamBuffer> queue;
private final CassandraFileSource<T> source;
- private final IStats<T> stats;
+ private final BufferingInputStreamStats<T> stats;
private final long startTimeNanos;
// Variables accessed by both producer, consumer & timeout thread so must
be volatile or atomic
@@ -90,9 +90,9 @@ public class BufferingInputStream<T extends CassandraFile>
extends InputStream i
/**
* @param source CassandraFileSource to async provide the bytes after
{@link CassandraFileSource#request(long, long, StreamConsumer)} is called
*
- * @param stats {@link IStats} implementation for recording instrumentation
+ * @param stats {@link BufferingInputStreamStats} implementation for
recording instrumentation
*/
- public BufferingInputStream(CassandraFileSource<T> source, IStats<T> stats)
+ public BufferingInputStream(CassandraFileSource<T> source,
BufferingInputStreamStats<T> stats)
{
this.source = source;
this.queue = new LinkedBlockingQueue<>();
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
index 8b7a227..2f3b546 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/SSTableLister.java
@@ -193,6 +193,6 @@ public class SSTableLister implements SSTableCollector
{
throw new IllegalArgumentException("SSTable should have only one
data component");
}
- return new FileSystemSSTable(dataComponents.get(0), true, () ->
Stats.DoNothingStats.INSTANCE);
+ return new FileSystemSSTable(dataComponents.get(0), true,
Stats.DoNothingStats.INSTANCE::bufferingInputStreamStats);
}
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
index e12c004..b0e6161 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/LocalDataLayer.java
@@ -324,7 +324,7 @@ public class LocalDataLayer extends DataLayer implements
Serializable
.map(Paths::get)
.flatMap(Throwing.function(Files::list))
.filter(path -> path.getFileName().toString().endsWith("-" +
FileType.DATA.getFileSuffix()))
- .map(path -> new FileSystemSSTable(path,
useBufferingInputStream, this::stats))
+ .map(path -> new FileSystemSSTable(path,
useBufferingInputStream, () -> this.stats.bufferingInputStreamStats()))
.collect(Collectors.toSet()));
}
diff --git
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
index f1b249c..418604f 100644
---
a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
+++
b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/SidecarProvisionedSSTable.java
@@ -172,7 +172,7 @@ public class SidecarProvisionedSSTable extends SSTable
public InputStream open(ListSnapshotFilesResponse.FileInfo fileInfo,
FileType fileType)
{
CassandraFileSource<SidecarProvisionedSSTable> ssTableSource =
source(fileInfo, fileType);
- return new BufferingInputStream<>(ssTableSource, stats);
+ return new BufferingInputStream<>(ssTableSource,
stats.bufferingInputStreamStats());
}
/**
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
index c0e01e2..acf439e 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/EndToEndTests.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.bridge.CassandraBridgeFactory;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.spark.data.CqlField;
import org.apache.cassandra.spark.data.SSTable;
+import org.apache.cassandra.spark.stats.BufferingInputStreamStats;
import org.apache.cassandra.spark.stats.Stats;
import org.apache.cassandra.spark.utils.RandomUtils;
import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
@@ -1948,7 +1949,7 @@ public class EndToEndTests
}
@SuppressWarnings("unused") // Actually used via reflection in
testLargeBlobExclude()
- public static final Stats<SSTable> STATS = new Stats<SSTable>()
+ public static final Stats STATS = new Stats()
{
@Override
public void skippedBytes(long length)
@@ -1956,13 +1957,19 @@ public class EndToEndTests
skippedRawBytes.addAndGet(length);
}
- @Override
- public void inputStreamBytesSkipped(CassandraFileSource<SSTable>
ssTable,
- long bufferedSkipped,
- long rangeSkipped)
+ public BufferingInputStreamStats<SSTable> bufferingInputStreamStats()
{
- skippedInputStreamBytes.addAndGet(bufferedSkipped);
- skippedRangeBytes.addAndGet(rangeSkipped);
+ return new BufferingInputStreamStats<SSTable>()
+ {
+ @Override
+ public void
inputStreamBytesSkipped(CassandraFileSource<SSTable> ssTable,
+ long bufferedSkipped,
+ long rangeSkipped)
+ {
+ skippedInputStreamBytes.addAndGet(bufferedSkipped);
+ skippedRangeBytes.addAndGet(rangeSkipped);
+ }
+ };
}
};
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
index c40a808..4c6505b 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamHttpTest.java
@@ -290,7 +290,7 @@ public class BufferingInputStreamHttpTest
Files.size(path),
maxBufferSize,
chunkBufferSize);
- try (BufferingInputStream<SSTable> is = new
BufferingInputStream<>(source, BufferingInputStreamTests.STATS))
+ try (BufferingInputStream<SSTable> is = new
BufferingInputStream<>(source,
BufferingInputStreamTests.STATS.bufferingInputStreamStats()))
{
actualMD5 = DigestUtils.md5(is);
blockingTimeMillis =
TimeUnit.MILLISECONDS.convert(is.timeBlockedNanos(), TimeUnit.NANOSECONDS);
diff --git
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
index 6cdefda..2652e3f 100644
---
a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
+++
b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/utils/BufferingInputStreamTests.java
@@ -140,7 +140,7 @@ public class BufferingInputStreamTests
requestCount.incrementAndGet();
writeBuffers(consumer, randomBuffers(chunksPerRequest));
}, null);
- BufferingInputStream<SSTable> is = new
BufferingInputStream<>(mockedClient, STATS);
+ BufferingInputStream<SSTable> is = new
BufferingInputStream<>(mockedClient, STATS.bufferingInputStreamStats());
readStreamFully(is);
assertEquals(numRequests, requestCount.get());
assertEquals(0L, is.bytesBuffered());
@@ -170,7 +170,7 @@ public class BufferingInputStreamTests
}
}, null);
assertThrows(IOException.class,
- () -> readStreamFully(new BufferingInputStream<>(source,
STATS))
+ () -> readStreamFully(new BufferingInputStream<>(source,
STATS.bufferingInputStreamStats()))
);
}
@@ -220,7 +220,7 @@ public class BufferingInputStreamTests
});
}
}, timeout);
- BufferingInputStream<SSTable> inputStream = new
BufferingInputStream<>(source, STATS);
+ BufferingInputStream<SSTable> inputStream = new
BufferingInputStream<>(source, STATS.bufferingInputStreamStats());
try
{
readStreamFully(inputStream);
@@ -292,7 +292,7 @@ public class BufferingInputStreamTests
int bytesToRead = chunkSize * numChunks;
long skipAhead = size - bytesToRead + 1;
- try (BufferingInputStream<SSTable> stream = new
BufferingInputStream<>(source, STATS))
+ try (BufferingInputStream<SSTable> stream = new
BufferingInputStream<>(source, STATS.bufferingInputStreamStats()))
{
// Skip ahead so we only read the final chunks
ByteBufferUtils.skipFully(stream, skipAhead);
@@ -341,7 +341,7 @@ public class BufferingInputStreamTests
}
};
- try (BufferingInputStream<SSTable> stream = new
BufferingInputStream<>(source, STATS))
+ try (BufferingInputStream<SSTable> stream = new
BufferingInputStream<>(source, STATS.bufferingInputStreamStats()))
{
ByteBufferUtils.skipFully(stream, 20971520);
readStreamFully(stream);
diff --git
a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
index e280a74..eca2fcb 100644
--- a/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
+++ b/cassandra-bridge/src/main/java/org/apache/cassandra/spark/stats/Stats.java
@@ -30,11 +30,11 @@ import org.apache.cassandra.spark.data.SSTablesSupplier;
import org.apache.cassandra.spark.reader.IndexEntry;
import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
import org.apache.cassandra.spark.sparksql.filters.SparkRangeFilter;
-import org.apache.cassandra.spark.utils.streaming.CassandraFileSource;
+import org.apache.cassandra.spark.utils.streaming.CassandraFile;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-public abstract class Stats<T extends SSTable> implements IStats<T>
+public abstract class Stats
{
public static class DoNothingStats extends Stats
@@ -42,6 +42,11 @@ public abstract class Stats<T extends SSTable> implements
IStats<T>
public static final DoNothingStats INSTANCE = new DoNothingStats();
}
+ public <T extends CassandraFile> BufferingInputStreamStats<T>
bufferingInputStreamStats()
+ {
+ return BufferingInputStreamStats.doNothingStats();
+ }
+
// Spark Row Iterator
/**
@@ -357,101 +362,6 @@ public abstract class Stats<T extends SSTable> implements
IStats<T>
{
}
- // SSTable Input Stream
-
- /**
- * When {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} queue is full,
usually indicating
- * job is CPU-bound and blocked on the CompactionIterator
- *
- * @param ssTable the SSTable source for this input stream
- */
- public void inputStreamQueueFull(CassandraFileSource<? extends SSTable>
ssTable)
- {
- }
-
- /**
- * Failure occurred in the {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
- *
- * @param ssTable the SSTable source for this input stream
- * @param throwable throwable
- */
- public void inputStreamFailure(CassandraFileSource<T> ssTable, Throwable
throwable)
- {
- }
-
- /**
- * Time the {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} spent blocking
on queue
- * waiting for bytes. High time spent blocking indicates the job is
network-bound, or blocked on the
- * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
to supply the bytes.
- *
- * @param ssTable the SSTable source for this input stream
- * @param nanos time in nanoseconds
- */
- public void inputStreamTimeBlocked(CassandraFileSource<T> ssTable, long
nanos)
- {
- }
-
- /**
- * Bytes written to {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
- * by the {@link
org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
- *
- * @param ssTable the SSTable source for this input stream
- * @param length number of bytes written
- */
- public void inputStreamBytesWritten(CassandraFileSource<T> ssTable, int
length)
- {
- }
-
- /**
- * Bytes read from {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
- *
- * @param ssTable the SSTable source for this input stream
- * @param length number of bytes read
- * @param queueSize current queue size
- * @param percentComplete % completion
- */
- public void inputStreamByteRead(CassandraFileSource<T> ssTable,
- int length,
- int queueSize,
- int percentComplete)
- {
- }
-
- /**
- * {@link org.apache.cassandra.spark.utils.streaming.CassandraFileSource}
has finished writing
- * to {@link
org.apache.cassandra.spark.utils.streaming.BufferingInputStream} after reaching
expected file length
- *
- * @param ssTable the SSTable source for this input stream
- */
- public void inputStreamEndBuffer(CassandraFileSource<T> ssTable)
- {
- }
-
- /**
- * {@link org.apache.cassandra.spark.utils.streaming.BufferingInputStream}
finished and closed
- *
- * @param ssTable the SSTable source for this input stream
- * @param runTimeNanos total time open in nanoseconds
- * @param totalNanosBlocked total time blocked on queue waiting for bytes
in nanoseconds
- */
- public void inputStreamEnd(CassandraFileSource<T> ssTable, long
runTimeNanos, long totalNanosBlocked)
- {
- }
-
- /**
- * Called when the InputStream skips bytes
- *
- * @param ssTable the SSTable source for this input stream
- * @param bufferedSkipped the number of bytes already buffered in memory
skipped
- * @param rangeSkipped the number of bytes skipped
- * by efficiently incrementing the start range for
the next request
- */
- public void inputStreamBytesSkipped(CassandraFileSource<T> ssTable,
- long bufferedSkipped,
- long rangeSkipped)
- {
- }
-
// PartitionSizeIterator stats
/**
diff --git
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
index 04cbc24..d10ccbc 100644
---
a/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
+++
b/cassandra-four-zero-bridge/src/test/java/org/apache/cassandra/spark/reader/SSTableReaderTests.java
@@ -592,7 +592,7 @@ public class SSTableReaderTests
AtomicBoolean pass = new AtomicBoolean(true);
AtomicInteger skipCount = new AtomicInteger(0);
- Stats<SSTable> stats = new Stats<SSTable>()
+ Stats stats = new Stats()
{
@Override
public void skippedSSTable(@Nullable SparkRangeFilter
sparkRangeFilter,
@@ -652,7 +652,7 @@ public class SSTableReaderTests
AtomicBoolean pass = new AtomicBoolean(true);
AtomicInteger skipCount = new AtomicInteger(0);
- Stats<SSTable> stats = new Stats<SSTable>()
+ Stats stats = new Stats()
{
@Override
public void skippedSSTable(@Nullable SparkRangeFilter
sparkRangeFilter,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]