jberragan commented on code in PR #97: URL: https://github.com/apache/cassandra-analytics/pull/97#discussion_r1934711506
########## cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java: ########## @@ -19,115 +19,50 @@ package org.apache.cassandra.spark.sparksql; -import java.io.IOException; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Iterator; import java.util.List; -import java.util.Objects; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.DataLayer; -import org.apache.cassandra.spark.data.converter.types.SparkType; -import org.apache.cassandra.spark.reader.RowData; import org.apache.cassandra.spark.data.converter.SparkSqlTypeConverter; -import org.apache.cassandra.spark.reader.StreamScanner; +import org.apache.cassandra.spark.data.converter.types.SparkType; import org.apache.cassandra.spark.sparksql.filters.PartitionKeyFilter; import org.apache.cassandra.spark.sparksql.filters.PruneColumnFilter; -import org.apache.cassandra.analytics.stats.Stats; -import org.apache.cassandra.spark.utils.ByteBufferUtils; import org.apache.cassandra.spark.utils.FastThreadLocalUtf8Decoder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -/** - * Iterate through CompactionIterator, deserializing ByteBuffers and normalizing into Object[] array in column order - */ -public class SparkCellIterator implements Iterator<Cell>, AutoCloseable +public class SparkCellIterator extends CellIterator { - private static final Logger LOGGER = LoggerFactory.getLogger(SparkCellIterator.class); - - protected final DataLayer dataLayer; - private final Stats stats; - private final CqlTable cqlTable; - private final Object[] values; + private final DataLayer dataLayer; private final SparkType[] sparkTypes; - @Nullable - protected final PruneColumnFilter columnFilter; - private final long startTimeNanos; - @NotNull - private final StreamScanner<RowData> scanner; - @NotNull - private final RowData rowData; - - // Mutable Iterator State - private boolean skipPartition = false; - private boolean newRow = false; - private boolean closed = false; - private Cell next = null; - private long previousTimeNanos; - - protected final int partitionId; - protected final int firstProjectedValueColumnPositionOrZero; - protected final boolean hasProjectedValueColumns; - private final SparkSqlTypeConverter sparkSqlTypeConverter; public SparkCellIterator(int partitionId, @NotNull DataLayer dataLayer, @Nullable StructType requiredSchema, @NotNull List<PartitionKeyFilter> partitionKeyFilters) { - this.partitionId = partitionId; + super(partitionId, + dataLayer.cqlTable(), + dataLayer.stats(), + dataLayer.typeConverter(), + partitionKeyFilters, + (cqlTable) -> buildColumnFilter(requiredSchema, cqlTable), + dataLayer::openCompactionScanner); this.dataLayer = dataLayer; - stats = dataLayer.stats(); - cqlTable = dataLayer.cqlTable(); - columnFilter = buildColumnFilter(requiredSchema, cqlTable); - if (columnFilter != null) - { - LOGGER.info("Adding prune column filter columns='{}'", String.join(",", columnFilter.requiredColumns())); - } - - hasProjectedValueColumns = cqlTable.numValueColumns() > 0 && - cqlTable.valueColumns() - .stream() - .anyMatch(field -> columnFilter == null || columnFilter.requiredColumns().contains(field.name())); - - // The value array copies across all the partition/clustering/static columns - // and the single column value for this cell to the SparkRowIterator - values = new Object[cqlTable.numNonValueColumns() + (hasProjectedValueColumns ? 1 : 0)]; - - // Open compaction scanner - startTimeNanos = System.nanoTime(); - previousTimeNanos = startTimeNanos; - scanner = openScanner(partitionId, partitionKeyFilters); - long openTimeNanos = System.nanoTime() - startTimeNanos; - LOGGER.info("Opened CompactionScanner runtimeNanos={}", openTimeNanos); - stats.openedCompactionScanner(openTimeNanos); - rowData = scanner.data(); - stats.openedSparkCellIterator(); - firstProjectedValueColumnPositionOrZero = maybeGetPositionOfFirstProjectedValueColumnOrZero(); - - sparkSqlTypeConverter = dataLayer.typeConverter(); - sparkTypes = new SparkType[cqlTable.numFields()]; + this.sparkTypes = new SparkType[cqlTable.numFields()]; for (int index = 0; index < cqlTable.numFields(); index++) { - sparkTypes[index] = sparkSqlTypeConverter.toSparkType(cqlTable.field(index).type()); + this.sparkTypes[index] = ((SparkSqlTypeConverter) this.typeConverter).toSparkType(cqlTable.field(index).type()); Review Comment: Makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org