DanielMorales9 opened a new issue, #32865:
URL: https://github.com/apache/beam/issues/32865

   ### What happened?
   
   When I attempt to write data into an hourly partitioned table, no matter 
whether the schema type Datetime is standard or logical (SQLType), it fails. 
   
   Iceberg expects a long but Beam passes a Datetime class. 
   
   ```java
   package com.experiment.beam;
   
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.managed.Managed;
   import org.apache.beam.sdk.options.PipelineOptions;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.schemas.Schema;
   import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
   import org.apache.beam.sdk.transforms.Create;
   import org.apache.beam.sdk.values.Row;
   import org.apache.hadoop.conf.Configuration;
   import org.apache.iceberg.PartitionSpec;
   import org.apache.iceberg.catalog.TableIdentifier;
   import org.apache.iceberg.hadoop.HadoopCatalog;
   import org.apache.iceberg.types.Types;
   import org.joda.time.Instant;
   import org.junit.After;
   import org.junit.Before;
   import org.junit.Test;
   import org.junit.runner.RunWith;
   import org.junit.runners.Parameterized;
   
   import java.io.File;
   import java.time.LocalDateTime;
   import java.time.temporal.ChronoUnit;
   import java.util.*;
   
   import static org.junit.Assert.assertTrue;
   
   @RunWith(Parameterized.class)
   public class HourlyPartitionedIcebergTableTest {
   
       // Constants for testing
       private static final String WAREHOUSE_DIR =
               new File("src/test/resources/iceberg-test").getAbsolutePath();
       private static final String WAREHOUSE_PATH = String.format("file:///%s", 
WAREHOUSE_DIR);
       private static final Map<String, Object> CATALOG_CONFIG = new 
HashMap<String, Object>() {{
           put("warehouse", WAREHOUSE_PATH);
           put("type", "hadoop");
       }};
       private static final Map<String, Object> ICEBERG_CONFIG = new 
HashMap<String, Object>() {{
           put("catalog_name", "iceberg");
           put("catalog_properties", CATALOG_CONFIG);
       }};
       private static final Configuration CONF = new Configuration();
       private static final HadoopCatalog HADOOP_CATALOG = new 
HadoopCatalog(CONF, WAREHOUSE_PATH);
   
       private final String schemaName;
       private final String tableName;
       private final Schema beamSchema;
       private final List<Row> testData;
       private TableIdentifier tableIdentifier;
   
   
       @Parameterized.Parameters
       public static Collection<Object[]> data() {
           Schema schemaWithSQLDatetime = Schema.builder()
                   .addStringField("id")
                   .addLogicalTypeField("event_time", SqlTypes.DATETIME)
                   .build();
           Schema schemaWithDatetime = Schema.builder()
                   .addStringField("id")
                   .addDateTimeField("event_time")
                   .build();
           return Arrays.asList(new Object[][]{
                   {
                           "myschema",
                           "test1",
                           schemaWithDatetime,
                           Arrays.asList(
                                   Row.withSchema(schemaWithDatetime)
                                           .addValues("record-1", Instant.now())
                                           .build(),
                                   Row.withSchema(schemaWithDatetime)
                                           .addValues("record-2", 
Instant.now().minus(3600 * 1000))
                                           .build()
                           )
                   },
                   {
                           "myschema",
                           "test2",
                           schemaWithSQLDatetime,
                           Arrays.asList(
                                   Row.withSchema(schemaWithSQLDatetime)
                                           .addValues("record-3", 
LocalDateTime.now())
                                           .build(),
                                   Row.withSchema(schemaWithSQLDatetime)
                                           .addValues("record-4", 
LocalDateTime.now().minus(1, ChronoUnit.HOURS))
                                           .build()
                           )
                   },
           });
       }
   
       public HourlyPartitionedIcebergTableTest(String schemaName, String 
tableName, Schema beamSchema,
                                                List<Row> hardcodedData) {
           this.schemaName = schemaName;
           this.tableName = tableName;
           this.beamSchema = beamSchema;
           this.testData = hardcodedData;
       }
   
       @Before
       public void setUp() {
           // Create table identifier for the test
           tableIdentifier = TableIdentifier.of(schemaName, tableName);
   
           // Define Iceberg schema
           org.apache.iceberg.Schema icebergSchema = new 
org.apache.iceberg.Schema(
                   Types.NestedField.required(1, "id", Types.StringType.get()),
                   Types.NestedField.required(2, "event_time", 
Types.TimestampType.withZone())
           );
   
           PartitionSpec spec = PartitionSpec.builderFor(icebergSchema)
                   .hour("event_time")
                   .build();
   
           // Create the Iceberg table
           HADOOP_CATALOG.createTable(tableIdentifier, icebergSchema, spec);
       }
   
       @Test
       public void testPipeline() {
           // Define Beam pipeline options
           PipelineOptions options = PipelineOptionsFactory.create();
           Pipeline pipeline = Pipeline.create(options);
   
           ICEBERG_CONFIG.put("table", String.format("%s.%s", schemaName, 
tableName));
   
           // Build the pipeline using the hardcoded data
           pipeline
                   .apply("CreateHardcodedData", 
Create.of(testData).withRowSchema(beamSchema))
                   .apply("WriteToIceberg", 
Managed.write(Managed.ICEBERG).withConfig(ICEBERG_CONFIG));
   
           // Run the pipeline
           pipeline.run().waitUntilFinish();
   
           // Verify that the table exists after data is written
           assertTrue("Table should exist after writing data.", 
HADOOP_CATALOG.tableExists(tableIdentifier));
       }
   
       @After
       public void tearDown() {
           // Drop the table after the test
           if (HADOOP_CATALOG.tableExists(tableIdentifier)) {
               HADOOP_CATALOG.dropTable(tableIdentifier);
           }
       }
   }
   ```
   
   Test Results: 
   ```text
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: Not an instance of java.lang.Long: 
2024-10-18T16:14:32.934Z
   
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
        at 
com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
        at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
   Caused by: java.lang.IllegalStateException: Not an instance of 
java.lang.Long: 2024-10-18T16:14:32.934Z
        at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
        at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
        at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
        at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
        at 
org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
        at 
org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
        at 
org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)
   ```
   
   ```text
   
   org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalStateException: Not an instance of java.lang.Long: 
2024-10-18T17:14:32.961579Z
   
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
        at 
com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runners.Suite.runChild(Suite.java:128)
        at org.junit.runners.Suite.runChild(Suite.java:27)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
        at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
   Caused by: java.lang.IllegalStateException: Not an instance of 
java.lang.Long: 2024-10-18T17:14:32.961579Z
        at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
        at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
        at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
        at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
        at 
org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
        at 
org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
        at 
org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)
   ```
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [X] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to