DanielMorales9 opened a new issue, #32865:
URL: https://github.com/apache/beam/issues/32865
### What happened?
When I attempt to write data into an hourly partitioned table, no matter
whether the schema type Datetime is standard or logical (SQLType), it fails.
Iceberg expects a long but Beam passes a Datetime class.
```java
package com.experiment.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Types;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import static org.junit.Assert.assertTrue;
@RunWith(Parameterized.class)
public class HourlyPartitionedIcebergTableTest {
// Constants for testing
private static final String WAREHOUSE_DIR =
new File("src/test/resources/iceberg-test").getAbsolutePath();
private static final String WAREHOUSE_PATH = String.format("file:///%s",
WAREHOUSE_DIR);
private static final Map<String, Object> CATALOG_CONFIG = new
HashMap<String, Object>() {{
put("warehouse", WAREHOUSE_PATH);
put("type", "hadoop");
}};
private static final Map<String, Object> ICEBERG_CONFIG = new
HashMap<String, Object>() {{
put("catalog_name", "iceberg");
put("catalog_properties", CATALOG_CONFIG);
}};
private static final Configuration CONF = new Configuration();
private static final HadoopCatalog HADOOP_CATALOG = new
HadoopCatalog(CONF, WAREHOUSE_PATH);
private final String schemaName;
private final String tableName;
private final Schema beamSchema;
private final List<Row> testData;
private TableIdentifier tableIdentifier;
@Parameterized.Parameters
public static Collection<Object[]> data() {
Schema schemaWithSQLDatetime = Schema.builder()
.addStringField("id")
.addLogicalTypeField("event_time", SqlTypes.DATETIME)
.build();
Schema schemaWithDatetime = Schema.builder()
.addStringField("id")
.addDateTimeField("event_time")
.build();
return Arrays.asList(new Object[][]{
{
"myschema",
"test1",
schemaWithDatetime,
Arrays.asList(
Row.withSchema(schemaWithDatetime)
.addValues("record-1", Instant.now())
.build(),
Row.withSchema(schemaWithDatetime)
.addValues("record-2",
Instant.now().minus(3600 * 1000))
.build()
)
},
{
"myschema",
"test2",
schemaWithSQLDatetime,
Arrays.asList(
Row.withSchema(schemaWithSQLDatetime)
.addValues("record-3",
LocalDateTime.now())
.build(),
Row.withSchema(schemaWithSQLDatetime)
.addValues("record-4",
LocalDateTime.now().minus(1, ChronoUnit.HOURS))
.build()
)
},
});
}
public HourlyPartitionedIcebergTableTest(String schemaName, String
tableName, Schema beamSchema,
List<Row> hardcodedData) {
this.schemaName = schemaName;
this.tableName = tableName;
this.beamSchema = beamSchema;
this.testData = hardcodedData;
}
@Before
public void setUp() {
// Create table identifier for the test
tableIdentifier = TableIdentifier.of(schemaName, tableName);
// Define Iceberg schema
org.apache.iceberg.Schema icebergSchema = new
org.apache.iceberg.Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.required(2, "event_time",
Types.TimestampType.withZone())
);
PartitionSpec spec = PartitionSpec.builderFor(icebergSchema)
.hour("event_time")
.build();
// Create the Iceberg table
HADOOP_CATALOG.createTable(tableIdentifier, icebergSchema, spec);
}
@Test
public void testPipeline() {
// Define Beam pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
ICEBERG_CONFIG.put("table", String.format("%s.%s", schemaName,
tableName));
// Build the pipeline using the hardcoded data
pipeline
.apply("CreateHardcodedData",
Create.of(testData).withRowSchema(beamSchema))
.apply("WriteToIceberg",
Managed.write(Managed.ICEBERG).withConfig(ICEBERG_CONFIG));
// Run the pipeline
pipeline.run().waitUntilFinish();
// Verify that the table exists after data is written
assertTrue("Table should exist after writing data.",
HADOOP_CATALOG.tableExists(tableIdentifier));
}
@After
public void tearDown() {
// Drop the table after the test
if (HADOOP_CATALOG.tableExists(tableIdentifier)) {
HADOOP_CATALOG.dropTable(tableIdentifier);
}
}
}
```
Test Results:
```text
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalStateException: Not an instance of java.lang.Long:
2024-10-18T16:14:32.934Z
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at
com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of
java.lang.Long: 2024-10-18T16:14:32.934Z
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
at
org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
at
org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
at
org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)
```
```text
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalStateException: Not an instance of java.lang.Long:
2024-10-18T17:14:32.961579Z
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:310)
at
com.experiment.beam.HourlyPartitionedIcebergTableTest.testPipeline(HourlyPartitionedIcebergTableTest.java:136)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runners.Suite.runChild(Suite.java:128)
at org.junit.runners.Suite.runChild(Suite.java:27)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.IllegalStateException: Not an instance of
java.lang.Long: 2024-10-18T17:14:32.961579Z
at org.apache.iceberg.data.GenericRecord.get(GenericRecord.java:123)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
at
org.apache.beam.sdk.io.iceberg.RecordWriterManager$DestinationState.write(RecordWriterManager.java:136)
at
org.apache.beam.sdk.io.iceberg.RecordWriterManager.write(RecordWriterManager.java:270)
at
org.apache.beam.sdk.io.iceberg.WriteUngroupedRowsToFiles$WriteUngroupedRowsToFilesDoFn.processElement(WriteUngroupedRowsToFiles.java:243)
```
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [X] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Infrastructure
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [X] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]