jberragan commented on code in PR #97:
URL: 
https://github.com/apache/cassandra-analytics/pull/97#discussion_r1934711506


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java:
##########
@@ -19,115 +19,50 @@
 
 package org.apache.cassandra.spark.sparksql;
 
-import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.spark.data.CqlField;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.DataLayer;
-import org.apache.cassandra.spark.data.converter.types.SparkType;
-import org.apache.cassandra.spark.reader.RowData;
 import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter;
-import org.apache.cassandra.spark.reader.StreamScanner;
+import org.apache.cassandra.spark.data.converter.types.SparkType;
 import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter;
 import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter;
-import org.apache.cassandra.analytics.stats.Stats;
-import org.apache.cassandra.spark.utils.ByteBufferUtils;
 import org.apache.cassandra.spark.utils.FastThreadLocalUtf8Decoder;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-/**
- * Iterate through CompactionIterator, deserializing ByteBuffers and 
normalizing into Object[] array in column order
- */
-public class SparkCellIterator implements Iterator<Cell>, AutoCloseable
+public class SparkCellIterator extends CellIterator
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkCellIterator.class);
-
-    protected final DataLayer dataLayer;
-    private final Stats stats;
-    private final CqlTable cqlTable;
-    private final Object[] values;
+    private final DataLayer dataLayer;
     private final SparkType[] sparkTypes;
-    @Nullable
-    protected final PruneColumnFilter columnFilter;
-    private final long startTimeNanos;
-    @NotNull
-    private final StreamScanner<RowData> scanner;
-    @NotNull
-    private final RowData rowData;
-
-    // Mutable Iterator State
-    private boolean skipPartition = false;
-    private boolean newRow = false;
-    private boolean closed = false;
-    private Cell next = null;
-    private long previousTimeNanos;
-
-    protected final int partitionId;
-    protected final int firstProjectedValueColumnPositionOrZero;
-    protected final boolean hasProjectedValueColumns;
-    private final SparkSqlTypeConverter sparkSqlTypeConverter;
 
     public SparkCellIterator(int partitionId,
                              @NotNull DataLayer dataLayer,
                              @Nullable StructType requiredSchema,
                              @NotNull List<PartitionKeyFilter> 
partitionKeyFilters)
     {
-        this.partitionId = partitionId;
+        super(partitionId,
+              dataLayer.cqlTable(),
+              dataLayer.stats(),
+              dataLayer.typeConverter(),
+              partitionKeyFilters,
+              (cqlTable) -> buildColumnFilter(requiredSchema, cqlTable),
+              dataLayer::openCompactionScanner);
         this.dataLayer = dataLayer;
-        stats = dataLayer.stats();
-        cqlTable = dataLayer.cqlTable();
-        columnFilter = buildColumnFilter(requiredSchema, cqlTable);
-        if (columnFilter != null)
-        {
-            LOGGER.info("Adding prune column filter columns='{}'", 
String.join(",", columnFilter.requiredColumns()));
-        }
-
-        hasProjectedValueColumns = cqlTable.numValueColumns() > 0 &&
-                                   cqlTable.valueColumns()
-                                           .stream()
-                                           .anyMatch(field -> columnFilter == 
null || columnFilter.requiredColumns().contains(field.name()));
-
-        // The value array copies across all the partition/clustering/static 
columns
-        // and the single column value for this cell to the SparkRowIterator
-        values = new Object[cqlTable.numNonValueColumns() + 
(hasProjectedValueColumns ? 1 : 0)];
-
-        // Open compaction scanner
-        startTimeNanos = System.nanoTime();
-        previousTimeNanos = startTimeNanos;
-        scanner = openScanner(partitionId, partitionKeyFilters);
-        long openTimeNanos = System.nanoTime() - startTimeNanos;
-        LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos);
-        stats.openedCompactionScanner(openTimeNanos);
-        rowData = scanner.data();
-        stats.openedSparkCellIterator();
-        firstProjectedValueColumnPositionOrZero = 
maybeGetPositionOfFirstProjectedValueColumnOrZero();
-
-        sparkSqlTypeConverter = dataLayer.typeConverter();
-        sparkTypes = new SparkType[cqlTable.numFields()];
+        this.sparkTypes = new SparkType[cqlTable.numFields()];
         for (int index = 0; index < cqlTable.numFields(); index++)
         {
-            sparkTypes[index] = 
sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type());
+            this.sparkTypes[index] = ((SparkSqlTypeConverter) 
this.typeConverter).toSparkType(cqlTable.field(index).type());

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to