mzitnik commented on code in PR #24424:
URL: https://github.com/apache/beam/pull/24424#discussion_r1045093784
##########
sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java:
##########
@@ -68,98 +70,98 @@ static void writeValue(ClickHouseRowBinaryStream stream,
ColumnType columnType,
break;
case FLOAT32:
- stream.writeFloat32((Float) value);
+ BinaryStreamUtils.writeFloat32(stream, (Float) value);
break;
case FLOAT64:
- stream.writeFloat64((Double) value);
+ BinaryStreamUtils.writeFloat64(stream, (Double) value);
break;
case INT8:
- stream.writeInt8((Byte) value);
+ BinaryStreamUtils.writeInt8(stream, (Byte) value);
break;
case INT16:
- stream.writeInt16((Short) value);
+ BinaryStreamUtils.writeInt16(stream, (Short) value);
break;
case INT32:
- stream.writeInt32((Integer) value);
+ BinaryStreamUtils.writeInt32(stream, (Integer) value);
break;
case INT64:
- stream.writeInt64((Long) value);
+ BinaryStreamUtils.writeInt64(stream, (Long) value);
break;
case STRING:
- stream.writeString((String) value);
+ BinaryStreamUtils.writeString(stream, (String) value);
break;
case UINT8:
- stream.writeUInt8((Short) value);
+ BinaryStreamUtils.writeUnsignedInt8(stream, (Short) value);
break;
case UINT16:
- stream.writeUInt16((Integer) value);
+ BinaryStreamUtils.writeUnsignedInt16(stream, (Integer) value);
break;
case UINT32:
- stream.writeUInt32((Long) value);
+ BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value);
break;
case UINT64:
- stream.writeUInt64((Long) value);
+ BinaryStreamUtils.writeUnsignedInt64(stream, (Long) value);
break;
case ENUM8:
Integer enum8 = columnType.enumValues().get((String) value);
Preconditions.checkNotNull(
enum8,
"unknown enum value '" + value + "', possible values: " +
columnType.enumValues());
- stream.writeInt8(enum8);
+ BinaryStreamUtils.writeInt8(stream, enum8);
break;
case ENUM16:
Integer enum16 = columnType.enumValues().get((String) value);
Preconditions.checkNotNull(
enum16,
"unknown enum value '" + value + "', possible values: " +
columnType.enumValues());
- stream.writeInt16(enum16);
+ BinaryStreamUtils.writeInt16(stream, enum16);
break;
case DATE:
Days epochDays = Days.daysBetween(EPOCH_INSTANT, (ReadableInstant)
value);
- stream.writeUInt16(epochDays.getDays());
+ BinaryStreamUtils.writeUnsignedInt16(stream, epochDays.getDays());
break;
case DATETIME:
long epochSeconds = ((ReadableInstant) value).getMillis() / 1000L;
- stream.writeUInt32(epochSeconds);
+ BinaryStreamUtils.writeUnsignedInt32(stream, epochSeconds);
break;
case ARRAY:
List<Object> values = (List<Object>) value;
- stream.writeUnsignedLeb128(values.size());
+ BinaryStreamUtils.writeVarInt(stream, values.size());
for (Object arrayValue : values) {
writeValue(stream, columnType.arrayElementType(), arrayValue);
}
break;
}
}
- static void writeRow(ClickHouseRowBinaryStream stream, TableSchema schema,
Row row)
+ static void writeRow(ClickHouseOutputStream stream, TableSchema schema, Row
row)
throws IOException {
for (TableSchema.Column column : schema.columns()) {
if (!column.materializedOrAlias()) {
Object value = row.getValue(column.name());
if (column.columnType().nullable()) {
+ // BinaryStreamUtils.writeNull(stream);
Review Comment:
Removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]