MiguelAnzoWizeline commented on a change in pull request #14586:
URL: https://github.com/apache/beam/pull/14586#discussion_r660670298
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -163,4 +187,23 @@
public BoundedReader<T> createReader(PipelineOptions options) throws
IOException {
throw new UnsupportedOperationException("BigQuery storage source must be
split before reading");
}
+
+ /*private static org.apache.arrow.vector.types.pojo.Schema
convertArrowSchema(
+ ArrowSchema arrowSchema) throws IOException {
+ CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(new
ByteArrayOutputStream());
+ return org.apache.arrow.vector.types.pojo.Schema.deserialize(bb);
+ }*/
Review comment:
Done
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -163,4 +187,23 @@
public BoundedReader<T> createReader(PipelineOptions options) throws
IOException {
throw new UnsupportedOperationException("BigQuery storage source must be
split before reading");
}
+
+ /*private static org.apache.arrow.vector.types.pojo.Schema
convertArrowSchema(
+ ArrowSchema arrowSchema) throws IOException {
+ CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(new
ByteArrayOutputStream());
+ return org.apache.arrow.vector.types.pojo.Schema.deserialize(bb);
+ }*/
+
+ private static ArrowSchema convertArrowSchema(
+ org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
+ ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
+ try {
+ MessageSerializer.serialize(
+ new WriteChannel(Channels.newChannel(byteOutputStream)),
arrowSchema);
+ } catch (IOException ex) {
+ throw new RuntimeException("Failed to serialize arrow schema.", ex);
+ }
+ ByteString byteString =
ByteString.copyFrom(byteOutputStream.toByteArray());
+ return ArrowSchema.newBuilder().setSerializedSchema(byteString).build();
+ }
Review comment:
Done
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
##########
@@ -146,7 +159,18 @@
return ImmutableList.of();
}
- Schema sessionSchema = new
Schema.Parser().parse(readSession.getAvroSchema().getSchema());
+ Schema sessionSchema;
+ if (readSession.getDataFormat() == DataFormat.ARROW) {
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ ArrowConversion.arrowSchemaFromInput(
+ readSession.getArrowSchema().getSerializedSchema().newInput());
+ org.apache.beam.sdk.schemas.Schema beamSchema =
+ ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema);
+ sessionSchema = AvroUtils.toAvroSchema(beamSchema);
+ } else {
Review comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]