johnjcasey commented on code in PR #24424:
URL: https://github.com/apache/beam/pull/24424#discussion_r1044853143


##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java:
##########
@@ -39,19 +41,19 @@ public class ClickHouseWriter {
   private static final Instant EPOCH_INSTANT = new Instant(0L);
 
   @SuppressWarnings("unchecked")
-  static void writeNullableValue(
-      ClickHouseRowBinaryStream stream, ColumnType columnType, Object value) 
throws IOException {
+  static void writeNullableValue(ClickHouseOutputStream stream, ColumnType 
columnType, Object value)
+      throws IOException {
 
     if (value == null) {
-      stream.markNextNullable(true);
+      BinaryStreamUtils.writeNull(stream);
     } else {
-      stream.markNextNullable(false);
+      BinaryStreamUtils.writeNonNull(stream);

Review Comment:
   This api looks strange to me. Do you really need to manually write a null 
bit to the stream?



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java:
##########
@@ -39,19 +41,19 @@ public class ClickHouseWriter {
   private static final Instant EPOCH_INSTANT = new Instant(0L);
 
   @SuppressWarnings("unchecked")
-  static void writeNullableValue(
-      ClickHouseRowBinaryStream stream, ColumnType columnType, Object value) 
throws IOException {
+  static void writeNullableValue(ClickHouseOutputStream stream, ColumnType 
columnType, Object value)
+      throws IOException {
 
     if (value == null) {
-      stream.markNextNullable(true);
+      BinaryStreamUtils.writeNull(stream);
     } else {
-      stream.markNextNullable(false);
+      BinaryStreamUtils.writeNonNull(stream);
       writeValue(stream, columnType, value);
     }
   }
 
   @SuppressWarnings("unchecked")
-  static void writeValue(ClickHouseRowBinaryStream stream, ColumnType 
columnType, Object value)
+  static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, 
Object value)

Review Comment:
   At this point, I believe value is known to be non-null, so we should be able 
to mark it as such and remove the suppressed warning



##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java:
##########
@@ -68,98 +70,98 @@ static void writeValue(ClickHouseRowBinaryStream stream, 
ColumnType columnType,
         break;
 
       case FLOAT32:
-        stream.writeFloat32((Float) value);
+        BinaryStreamUtils.writeFloat32(stream, (Float) value);
         break;
 
       case FLOAT64:
-        stream.writeFloat64((Double) value);
+        BinaryStreamUtils.writeFloat64(stream, (Double) value);
         break;
 
       case INT8:
-        stream.writeInt8((Byte) value);
+        BinaryStreamUtils.writeInt8(stream, (Byte) value);
         break;
 
       case INT16:
-        stream.writeInt16((Short) value);
+        BinaryStreamUtils.writeInt16(stream, (Short) value);
         break;
 
       case INT32:
-        stream.writeInt32((Integer) value);
+        BinaryStreamUtils.writeInt32(stream, (Integer) value);
         break;
 
       case INT64:
-        stream.writeInt64((Long) value);
+        BinaryStreamUtils.writeInt64(stream, (Long) value);
         break;
 
       case STRING:
-        stream.writeString((String) value);
+        BinaryStreamUtils.writeString(stream, (String) value);
         break;
 
       case UINT8:
-        stream.writeUInt8((Short) value);
+        BinaryStreamUtils.writeUnsignedInt8(stream, (Short) value);
         break;
 
       case UINT16:
-        stream.writeUInt16((Integer) value);
+        BinaryStreamUtils.writeUnsignedInt16(stream, (Integer) value);
         break;
 
       case UINT32:
-        stream.writeUInt32((Long) value);
+        BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value);
         break;
 
       case UINT64:
-        stream.writeUInt64((Long) value);
+        BinaryStreamUtils.writeUnsignedInt64(stream, (Long) value);
         break;
 
       case ENUM8:
         Integer enum8 = columnType.enumValues().get((String) value);
         Preconditions.checkNotNull(
             enum8,
             "unknown enum value '" + value + "', possible values: " + 
columnType.enumValues());
-        stream.writeInt8(enum8);
+        BinaryStreamUtils.writeInt8(stream, enum8);
         break;
 
       case ENUM16:
         Integer enum16 = columnType.enumValues().get((String) value);
         Preconditions.checkNotNull(
             enum16,
             "unknown enum value '" + value + "', possible values: " + 
columnType.enumValues());
-        stream.writeInt16(enum16);
+        BinaryStreamUtils.writeInt16(stream, enum16);
         break;
 
       case DATE:
         Days epochDays = Days.daysBetween(EPOCH_INSTANT, (ReadableInstant) 
value);
-        stream.writeUInt16(epochDays.getDays());
+        BinaryStreamUtils.writeUnsignedInt16(stream, epochDays.getDays());
         break;
 
       case DATETIME:
         long epochSeconds = ((ReadableInstant) value).getMillis() / 1000L;
-        stream.writeUInt32(epochSeconds);
+        BinaryStreamUtils.writeUnsignedInt32(stream, epochSeconds);
         break;
 
       case ARRAY:
         List<Object> values = (List<Object>) value;
-        stream.writeUnsignedLeb128(values.size());
+        BinaryStreamUtils.writeVarInt(stream, values.size());
         for (Object arrayValue : values) {
           writeValue(stream, columnType.arrayElementType(), arrayValue);
         }
         break;
     }
   }
 
-  static void writeRow(ClickHouseRowBinaryStream stream, TableSchema schema, 
Row row)
+  static void writeRow(ClickHouseOutputStream stream, TableSchema schema, Row 
row)
       throws IOException {
     for (TableSchema.Column column : schema.columns()) {
       if (!column.materializedOrAlias()) {
         Object value = row.getValue(column.name());
 
         if (column.columnType().nullable()) {
+          // BinaryStreamUtils.writeNull(stream);

Review Comment:
   remove this extra commented code please



##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java:
##########
@@ -169,13 +169,17 @@ public void testParseArrayOfArrays() {
 
   @Test
   public void testParseDefaultExpressionString() {
-    assertEquals(
-        "abc", ColumnType.parseDefaultExpression(ColumnType.STRING, 
"CAST('abc' AS String)"));
+    assertEquals("abc", ColumnType.parseDefaultExpression(ColumnType.STRING, 
"abc"));
+    //    assertEquals(

Review Comment:
   please remove commented code



##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java:
##########
@@ -92,9 +92,10 @@ public void testAtomicInsert() throws SQLException {
    * With sufficient block size, ClickHouse will atomically insert all or 
nothing. In the case of
    * replicated tables, it will deduplicate blocks.
    */
+  //  @Test

Review Comment:
   please remove this



##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java:
##########
@@ -92,9 +92,10 @@ public void testAtomicInsert() throws SQLException {
    * With sufficient block size, ClickHouse will atomically insert all or 
nothing. In the case of
    * replicated tables, it will deduplicate blocks.
    */
+  //  @Test
   @Test
   public void testIdempotentInsert() throws SQLException {
-    int size = 1000000;
+    int size = 100000;

Review Comment:
   why is this smaller?



##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java:
##########
@@ -56,7 +56,7 @@ private static boolean shouldAttempt(int i, long count) {
   /** With sufficient block size, ClickHouse will atomically insert all or 
nothing. */
   @Test
   public void testAtomicInsert() throws SQLException {
-    int size = 1000000;
+    int size = 100000;

Review Comment:
   why did you shrink the size?



##########
sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java:
##########
@@ -169,13 +169,17 @@ public void testParseArrayOfArrays() {
 
   @Test
   public void testParseDefaultExpressionString() {
-    assertEquals(
-        "abc", ColumnType.parseDefaultExpression(ColumnType.STRING, 
"CAST('abc' AS String)"));
+    assertEquals("abc", ColumnType.parseDefaultExpression(ColumnType.STRING, 
"abc"));
+    //    assertEquals(
+    //            "abc", ColumnType.parseDefaultExpression(ColumnType.STRING, 
"CAST('abc' AS
+    // String)"));
+
   }
 
   @Test
   public void testParseDefaultExpressionInt64() {
-    assertEquals(-1L, ColumnType.parseDefaultExpression(ColumnType.INT64, 
"CAST(-1 AS Int64)"));
+    assertEquals(-1L, ColumnType.parseDefaultExpression(ColumnType.INT64, 
"-1"));
+    // assertEquals(-1L, ColumnType.parseDefaultExpression(ColumnType.INT64, 
"CAST(-1 AS Int64)"));

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to