yifan-c commented on code in PR #71:
URL:
https://github.com/apache/cassandra-analytics/pull/71#discussion_r1715753096
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##########
@@ -35,19 +35,16 @@
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.apache.cassandra.bridge.BigNumberConfig;
import org.apache.cassandra.bridge.CassandraVersion;
import org.apache.cassandra.spark.utils.RandomUtils;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.DataType;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings({ "WeakerAccess", "unused" })
public class CqlField implements Serializable, Comparable<CqlField>
{
private static final long serialVersionUID = 42L;
+ private static final Comparator<String> STRING_COMPARATOR =
String::compareTo;
Review Comment:
Only `STRING_COMPARATOR` is `private`.
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##########
@@ -222,12 +210,17 @@ public interface CqlTuple extends CqlCollection
{
ByteBuffer serializeTuple(Object[] values);
- Object[] deserializeTuple(ByteBuffer buffer, boolean isFrozen);
+ Object[] deserializeTuple(TypeConverter converter, ByteBuffer buffer,
boolean isFrozen);
Review Comment:
Please add javadoc for the API methods touched in this patch.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/sparksql/SparkCellIterator.java:
##########
@@ -325,7 +329,10 @@ private void maybeRebuildClusteringKeys(@NotNull
ByteBuffer columnNameBuf)
{
Object newObj = deserialize(field,
ByteBufferUtils.extractComponent(columnNameBuf, index++));
Object oldObj = values[field.position()];
- if (newRow || oldObj == null || newObj == null ||
!field.equals(newObj, oldObj))
+ // Historically, we compare equality of clustering keys using the
Spark types
+ // to determine if we have moved to a new 'row'. We could also
compare using the Cassandra types
+ // or the raw ByteBuffers before converting to Spark types.
+ if (newRow || oldObj == null || newObj == null ||
!sparkSqlTypeConverter.toSparkType(field.type()).equals(newObj, oldObj))
Review Comment:
Question: is the comment a todo (for moving to compare using Cassandra
types/raw bytes)?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##########
@@ -201,6 +184,11 @@ public interface CqlCollection extends CqlType
CqlField.CqlType type();
CqlField.CqlType type(int position);
+
+ default boolean isComplex()
Review Comment:
add javadoc
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##########
@@ -473,6 +462,11 @@ public void write(Kryo kryo, Output output, CqlField field)
public static UnsupportedOperationException notImplemented(CqlType type)
{
- return new UnsupportedOperationException(type.toString() + " type not
implemented");
+ return notImplemented(type.toString());
+ }
+
+ public static UnsupportedOperationException notImplemented(String type)
+ {
+ return new UnsupportedOperationException(type + " type not
implemented");
}
Review Comment:
The exception message is very much about type only. Can you make it a
`private` method? To avoid being used in the other context.
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface TypeConverter
Review Comment:
how about `SparkTypeConverter`?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##########
@@ -330,14 +329,39 @@ public CqlType type()
return type;
}
- public Object deserialize(ByteBuffer buffer)
+ public Object deserializeToType(TypeConverter converter, ByteBuffer buffer)
+ {
+ return deserializeToType(converter, buffer, false);
+ }
+
+ /**
+ * Deserialize raw ByteBuffer from Cassandra type and convert to a new
type using the TypeConverter.
+ *
+ * @param converter custom TypeConverter that maps Cassandra type to some
other type.
+ * @param buffer raw ByteBuffer
+ * @param isFrozen true if the Cassandra type is frozen
+ * @return deserialized object converted to custom type.
+ */
+ public Object deserializeToType(TypeConverter converter, ByteBuffer
buffer, boolean isFrozen)
+ {
+ return type().deserializeToType(converter, buffer, isFrozen);
+ }
+
+ public Object deserializeToJava(ByteBuffer buffer)
Review Comment:
nit: `deserializeToJavaType`
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/TypeConverter.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.data;
+
+import org.jetbrains.annotations.NotNull;
+
+public interface TypeConverter
+{
+ /**
+ * Converts Cassandra native value to desired equivalent.
Review Comment:
not sure what does it mean with "Cassandra native value". Is the `Object
value` indeed java type value?
##########
cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java:
##########
@@ -330,14 +329,39 @@ public CqlType type()
return type;
}
- public Object deserialize(ByteBuffer buffer)
+ public Object deserializeToType(TypeConverter converter, ByteBuffer buffer)
+ {
+ return deserializeToType(converter, buffer, false);
+ }
+
+ /**
+ * Deserialize raw ByteBuffer from Cassandra type and convert to a new
type using the TypeConverter.
+ *
+ * @param converter custom TypeConverter that maps Cassandra type to some
other type.
+ * @param buffer raw ByteBuffer
+ * @param isFrozen true if the Cassandra type is frozen
+ * @return deserialized object converted to custom type.
+ */
+ public Object deserializeToType(TypeConverter converter, ByteBuffer
buffer, boolean isFrozen)
Review Comment:
nit: `deserializeToSparkType` to better distinguish?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]