kkishoreyadav opened a new issue #2174:
URL: https://github.com/apache/iceberg/issues/2174
1) We are encountering the following error when appending AVRO files from
GCS to table. The avro files are valid but we use deflated avro, is that a
concern?
Exception in thread "streaming-job-executor-0"
java.lang.NoClassDefFoundError: org/apache/avro/InvalidAvroMagicException at
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at
org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77) at
org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) at
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
at
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46) at
org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127) at
org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149) at
org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:343)
at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:163) at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276)
at org.apache.ice
berg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213) at
org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197) at
org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189) at
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) at
com.snapchat.transformer.IcebergWriterImpl.appendData(IcebergWriterImpl.java:110)
at
com.snapchat.transformer.TransformerStreamingWorker.acknowledge(TransformerStreamingWorker.java:393)
at
com.snapchat.transformer.TransformerStreamingWorker.transformAndPublish(TransformerStreamingWorker.java:244)
at
com.snapchat.transformer.TransformerStreamingWorker.processBatch(TransformerStreamingWorker.java:208)
at
com.snapchat.transformer.TransformerStreamingWorker.processExactoBatchRDD(TransformerStreamingWorker.java:181)
at
com.snapchat.transformer.TransformerStreamingWorker.lambda$execute$d121240d$1(TransformerStreamingWorker.java:162)
at org.apache.spark.streaming.ap
i.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) at
org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2$adapted(JavaDStreamLike.scala:280)
at
org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at
org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
scala.util.Try$.apply(Try.scala:213) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at
org.apache.spark.stre
aming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by:
java.lang.ClassNotFoundException: org.apache.avro.InvalidAvroMagicException at
java.net.URLClassLoader.findClass(URLClassLoader.java:382) at
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at
java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 more
2) The log shows that table already exists for the iceberg table, however I
am unable to see the metadata file in gcs? I am running the spark job from the
dataproc cluster, where can i see the metadata file?
#####################
Iceberg version: 0.11
spark version 3.0
#####################
public void appendData(List<FileMetadata> publishedFiles, Schema
icebergSchema) {
TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER,
jobConfig.streamName());
// PartitionSpec partitionSpec =
IcebergInternalFields.getPartitionSpec(tableSchema);
HadoopTables tables = new HadoopTables(new Configuration());
PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
.build();
Table table = null;
if (tables.exists(tableIdentifier.name())) {
table = tables.load(tableIdentifier.name());
} else {
table = tables.create(
icebergSchema,
partitionSpec,
tableIdentifier.name());
}
AppendFiles appendFiles = table.newAppend();
for (FileMetadata fileMetadata : publishedFiles) {
appendFiles.appendFile(DataFiles.builder(partitionSpec)
.withPath(fileMetadata.getFilename())
.withFileSizeInBytes(fileMetadata.getFileSize())
.withRecordCount(fileMetadata.getRowCount())
.withFormat(FileFormat.AVRO)
.build());
}
appendFiles.commit();
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]