chamikaramj commented on code in PR #30805:
URL: https://github.com/apache/beam/pull/30805#discussion_r1559720847


##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/io/iceberg/IcebergIOReadTest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+public class IcebergIOReadTest {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergIOReadTest.class);
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+  @Rule public TestDataWarehouse warehouse = new 
TestDataWarehouse(TEMPORARY_FOLDER, "default");
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  static class PrintRow extends DoFn<Row, Row> {
+
+    @ProcessElement
+    public void process(@Element Row row, OutputReceiver<Row> output) throws 
Exception {
+      LOG.info("Got row {}", row);
+      output.output(row);
+    }
+  }
+
+  @Test
+  public void testSimpleScan() throws Exception {
+    TableIdentifier tableId =
+        TableIdentifier.of("default", "table" + 
Long.toString(UUID.randomUUID().hashCode(), 16));
+    Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
+    final Schema schema = 
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+
+    simpleTable
+        .newFastAppend()
+        .appendFile(
+            warehouse.writeRecords(
+                "file1s1.parquet", simpleTable.schema(), 
TestFixtures.FILE1SNAPSHOT1))
+        .appendFile(
+            warehouse.writeRecords(
+                "file2s1.parquet", simpleTable.schema(), 
TestFixtures.FILE2SNAPSHOT1))
+        .appendFile(
+            warehouse.writeRecords(
+                "file3s1.parquet", simpleTable.schema(), 
TestFixtures.FILE3SNAPSHOT1))
+        .commit();
+
+    final List<Row> expectedRows =
+        Stream.of(
+                TestFixtures.FILE1SNAPSHOT1,
+                TestFixtures.FILE2SNAPSHOT1,
+                TestFixtures.FILE3SNAPSHOT1)
+            .flatMap(List::stream)
+            .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+            .collect(Collectors.toList());
+
+    IcebergCatalogConfig catalogConfig =
+        IcebergCatalogConfig.builder()
+            .setName("hadoop")
+            .setIcebergCatalogType(CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
+            .setWarehouseLocation(warehouse.location)
+            .build();
+
+    PCollection<Row> output =
+        testPipeline
+            .apply(IcebergIO.readTable(catalogConfig, tableId))
+            .apply(ParDo.of(new PrintRow()))
+            .setCoder(
+                RowCoder.of(
+                    
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
+
+    PAssert.that(output)
+        .satisfies(
+            (Iterable<Row> rows) -> {
+              assertThat(rows, containsInAnyOrder(expectedRows.toArray()));
+              return null;
+            });
+
+    testPipeline.run();
+  }
+

Review Comment:
   Let's also add a test for splitting using 
`SourceTestUtils.assertSourcesEqualReferenceSource`.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergBoundedSource.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+public class IcebergBoundedSource extends BoundedSource<Row> {
+
+  private @Nullable CombinedScanTask task;
+  private IcebergScan scan;
+
+  public IcebergBoundedSource(IcebergScan scan, @Nullable CombinedScanTask 
task) {
+    this.task = task;
+    this.scan = scan;
+  }
+
+  public IcebergBoundedSource(IcebergScan scan) {
+    this(scan, null);
+  }
+
+  public Catalog catalog() {
+    return scan.getCatalog().catalog();
+  }
+
+  public Table table() {
+    return catalog()
+        .loadTable(TableIdentifier.of(scan.getTable().toArray(new 
String[scan.getTable().size()])));
+  }
+
+  private TableScan getTableScan() {
+    // Always project to our destination schema
+    return table().newScan().project(SchemaHelper.convert(scan.getSchema()));
+  }
+
+  private CombinedScanTask getTask() {
+    if (task == null) {
+      task = new 
BaseCombinedScanTask(ImmutableList.copyOf(getTableScan().planFiles()));
+    }
+    return task;
+  }
+
+  @Override
+  public List<? extends BoundedSource<Row>> split(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    ArrayList<IcebergBoundedSource> splits = new ArrayList<>();
+    switch (scan.getType()) {
+      case TABLE:
+        TableScan tableScan = getTableScan();
+
+        // Override the split size with our desired size
+        if (desiredBundleSizeBytes > 0) {
+          tableScan =
+              tableScan.option(TableProperties.SPLIT_SIZE, 
Long.toString(desiredBundleSizeBytes));
+        }
+        if (scan.getFilter() != null) {
+          tableScan = tableScan.filter(scan.getFilter());
+        }
+        if (scan.getCaseSensitive() != null) {
+          tableScan = tableScan.caseSensitive(scan.getCaseSensitive());
+        }
+        if (scan.getSnapshot() != null) {
+          tableScan = tableScan.useSnapshot(scan.getSnapshot());
+        }
+        if (scan.getBranch() != null) {
+          tableScan = tableScan.useRef(scan.getBranch());
+        } else if (scan.getTag() != null) {
+          tableScan = tableScan.useRef(scan.getTag());
+        }
+        try (CloseableIterable<CombinedScanTask> t = tableScan.planTasks()) {
+          for (CombinedScanTask c : t) {
+            splits.add(new IcebergBoundedSource(scan, c));
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        break;
+      case BATCH:
+        throw new UnsupportedOperationException("BATCH scan not supported");
+    }
+    return splits;

Review Comment:
   What will happen if the sources produced at line 104 above get re-split by 
the runner ?
   
   If such sources cannot be re-split, we should have a trivial case where we 
just return the original source to prevent data loss.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/CombinedScanReader.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CombinedScanReader extends BoundedSource.BoundedReader<Row> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(CombinedScanReader.class);
+
+  private final IcebergBoundedSource source;
+  private final Table table;
+
+  private final CombinedScanTask task;
+
+  private final Schema schema;
+
+  transient @Nullable org.apache.iceberg.Schema project;
+  transient @Nullable FileIO io;
+  transient @Nullable InputFilesDecryptor decryptor;
+  transient @Nullable Queue<FileScanTask> fileScanTasks;
+  transient @Nullable CloseableIterator<Record> currentIterator;
+  transient @Nullable Record current;
+
+  public CombinedScanReader(IcebergBoundedSource source, CombinedScanTask 
task, Schema schema) {
+    this.source = source;
+    this.table =
+        checkStateNotNull(
+            source.table(), "CombinedScanReader requires an 
IcebergBoundedSource with a table");
+    this.task = task;
+    this.schema = schema;
+    if (this.schema != null) {
+      project = SchemaHelper.convert(this.schema);
+    }
+  }
+
+  @Override
+  public boolean start() throws IOException {
+    EncryptionManager encryptionManager = table.encryption();
+    io = table.io();
+    decryptor = new InputFilesDecryptor(task, io, encryptionManager);
+
+    fileScanTasks = new ArrayDeque<>();
+    fileScanTasks.addAll(task.files());
+
+    return advance();
+  }
+
+  @Override
+  public boolean advance() throws IOException {
+    Queue<FileScanTask> fileScanTasks =
+        checkStateNotNull(this.fileScanTasks, "files null in advance() - did 
you call start()?");
+    InputFilesDecryptor decryptor =
+        checkStateNotNull(this.decryptor, "decryptor null in adance() - did 
you call start()?");
+
+    // This nullness annotation is incorrect, but the most expedient way to 
work with Iceberg's APIs
+    // which are not null-safe.
+    @SuppressWarnings("nullness")
+    org.apache.iceberg.@NonNull Schema project = this.project;
+
+    do {
+      // If our current iterator is working... do that.
+      if (currentIterator != null && currentIterator.hasNext()) {
+        current = currentIterator.next();
+        return true;
+      }
+
+      // Close out the current iterator and try to open a new one
+      if (currentIterator != null) {
+        currentIterator.close();
+        currentIterator = null;
+      }
+
+      LOG.info("Trying to open new file.");
+      if (fileScanTasks.isEmpty()) {
+        LOG.info("We have exhausted all available files in this 
CombinedScanTask");
+        break;
+      }
+
+      // We have a new file to start reading
+      FileScanTask fileTask = fileScanTasks.remove();
+      DataFile file = fileTask.file();
+      InputFile input = decryptor.getInputFile(fileTask);
+
+      CloseableIterable<Record> iterable;
+      switch (file.format()) {
+        case ORC:
+          LOG.info("Preparing ORC input");
+          iterable =
+              ORC.read(input)
+                  .split(fileTask.start(), fileTask.length())
+                  .project(project)
+                  .createReaderFunc(fileSchema -> 
GenericOrcReader.buildReader(project, fileSchema))
+                  .filter(fileTask.residual())
+                  .build();
+          break;
+        case PARQUET:
+          LOG.info("Preparing Parquet input.");
+          iterable =
+              Parquet.read(input)
+                  .split(fileTask.start(), fileTask.length())
+                  .project(project)
+                  .createReaderFunc(
+                      fileSchema -> GenericParquetReaders.buildReader(project, 
fileSchema))
+                  .filter(fileTask.residual())
+                  .build();
+          break;
+        case AVRO:
+          LOG.info("Preparing Avro input.");
+          iterable =
+              Avro.read(input)
+                  .split(fileTask.start(), fileTask.length())
+                  .project(project)
+                  .createReaderFunc(DataReader::create)
+                  .build();
+          break;
+        default:
+          throw new UnsupportedOperationException("Cannot read format: " + 
file.format());
+      }
+      currentIterator = iterable.iterator();
+
+    } while (true);
+
+    return false;
+  }
+
+  @Override
+  public Row getCurrent() throws NoSuchElementException {
+    if (current == null) {
+      throw new NoSuchElementException();
+    }
+    return RowHelper.recordToRow(schema, current);
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (currentIterator != null) {
+      currentIterator.close();
+    }
+    if (fileScanTasks != null) {
+      fileScanTasks.clear();
+      fileScanTasks = null;
+    }
+    if (io != null) {
+      io.close();
+    }
+  }
+
+  @Override
+  public BoundedSource<Row> getCurrentSource() {

Review Comment:
   Is it possible to also implement `getFractionConsumed` to support progress 
reporting in a meaningful way ?
   
   I think it will be very useful for autoscaling when using this with Dataflow.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaAndRowConversions.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+class SchemaAndRowConversions {
+
+  private SchemaAndRowConversions() {}
+
+  public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID";
+
+  public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return Schema.FieldType.BOOLEAN;
+      case INTEGER:
+        return Schema.FieldType.INT32;
+      case LONG:
+        return Schema.FieldType.INT64;
+      case FLOAT:
+        return Schema.FieldType.FLOAT;
+      case DOUBLE:
+        return Schema.FieldType.DOUBLE;
+      case DATE:
+      case TIME:
+      case TIMESTAMP: // TODO: Logical types?
+        return Schema.FieldType.DATETIME;
+      case STRING:
+        return Schema.FieldType.STRING;
+      case UUID:
+      case BINARY:
+        return Schema.FieldType.BYTES;
+      case FIXED:
+      case DECIMAL:
+        return Schema.FieldType.DECIMAL;
+      case STRUCT:
+        return 
Schema.FieldType.row(icebergStructTypeToBeamSchema(type.asStructType()));
+      case LIST:
+        return Schema.FieldType.iterable(
+            icebergTypeToBeamFieldType(type.asListType().elementType()));
+      case MAP:
+        return Schema.FieldType.map(
+            icebergTypeToBeamFieldType(type.asMapType().keyType()),
+            icebergTypeToBeamFieldType(type.asMapType().valueType()));
+    }
+    throw new RuntimeException("Unrecognized IcebergIO Type");
+  }
+
+  public static Schema.Field icebergFieldToBeamField(final Types.NestedField 
field) {
+    return Schema.Field.of(field.name(), 
icebergTypeToBeamFieldType(field.type()))
+        .withOptions(
+            Schema.Options.builder()
+                .setOption(
+                    ICEBERG_TYPE_OPTION_NAME, Schema.FieldType.STRING, 
field.type().typeId().name())
+                .build())
+        .withNullable(field.isOptional());
+  }
+
+  public static Schema icebergSchemaToBeamSchema(final 
org.apache.iceberg.Schema schema) {
+    Schema.Builder builder = Schema.builder();
+    for (Types.NestedField f : schema.columns()) {
+      builder.addField(icebergFieldToBeamField(f));
+    }
+    return builder.build();
+  }
+
+  public static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
+    Schema.Builder builder = Schema.builder();
+    for (Types.NestedField f : struct.fields()) {
+      builder.addField(icebergFieldToBeamField(f));
+    }
+    return builder.build();
+  }
+
+  public static Types.NestedField beamFieldToIcebergField(int fieldId, final 
Schema.Field field) {
+    String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME, 
String.class);
+    if (typeId != null) {
+      return Types.NestedField.of(
+          fieldId,
+          field.getType().getNullable(),
+          field.getName(),
+          Types.fromPrimitiveString(typeId));
+    } else {
+      return Types.NestedField.of(
+          fieldId, field.getType().getNullable(), field.getName(), 
Types.StringType.get());
+    }
+  }
+
+  public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final 
Schema schema) {
+    Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
+    int fieldId = 0;
+    for (Schema.Field f : schema.getFields()) {
+      fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+    }
+    return new org.apache.iceberg.Schema(fields);
+  }
+
+  public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) {
+    return copyRowIntoRecord(GenericRecord.create(schema), row);
+  }
+
+  private static Record copyRowIntoRecord(Record baseRecord, Row value) {
+    Record rec = baseRecord.copy();
+    for (Types.NestedField f : rec.struct().fields()) {
+      copyFieldIntoRecord(rec, f, value);
+    }
+    return rec;
+  }
+
+  private static void copyFieldIntoRecord(Record rec, Types.NestedField field, 
Row value) {
+    String name = field.name();
+    switch (field.type().typeId()) {
+      case BOOLEAN:
+        Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case INTEGER:
+        Optional.ofNullable(value.getInt32(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case LONG:
+        Optional.ofNullable(value.getInt64(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case FLOAT:
+        Optional.ofNullable(value.getFloat(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case DOUBLE:
+        Optional.ofNullable(value.getDouble(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case DATE:
+        throw new UnsupportedOperationException("Date fields not yet 
supported");
+      case TIME:
+        throw new UnsupportedOperationException("Time fields not yet 
supported");
+      case TIMESTAMP:
+        Optional.ofNullable(value.getDateTime(name))
+            .ifPresent(v -> rec.setField(name, v.getMillis()));
+        break;
+      case STRING:
+        Optional.ofNullable(value.getString(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case UUID:
+        Optional.ofNullable(value.getBytes(name))
+            .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v)));
+        break;
+      case FIXED:
+        throw new UnsupportedOperationException("Fixed-precision fields are 
not yet supported.");
+      case BINARY:
+        Optional.ofNullable(value.getBytes(name))
+            .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v)));
+        break;
+      case DECIMAL:
+        Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case STRUCT:
+        Optional.ofNullable(value.getRow(name))
+            .ifPresent(
+                row ->
+                    rec.setField(
+                        name,
+                        
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
+        break;
+      case LIST:
+        throw new UnsupportedOperationException("List fields are not yet 
supported.");
+      case MAP:
+        throw new UnsupportedOperationException("Map fields are not yet 
supported.");
+    }
+  }
+
+  public static Row recordToRow(Schema schema, Record record) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Schema.Field field : schema.getFields()) {
+      switch (field.getType().getTypeName()) {
+        case BYTE:
+          // I guess allow anything we can cast here

Review Comment:
   Unit test for different types ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to