kkishoreyadav opened a new issue #2174:
URL: https://github.com/apache/iceberg/issues/2174


   1) We are encountering the following error when appending AVRO files from 
GCS to table. The avro files are valid but we use deflated avro, is that a 
concern?
   
   
   Exception in thread "streaming-job-executor-0" 
java.lang.NoClassDefFoundError: org/apache/avro/InvalidAvroMagicException at 
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at 
org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77) at 
org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) at 
org.apache.iceberg.relocated.com.google.common.collect.Iterables.addAll(Iterables.java:320)
 at 
org.apache.iceberg.relocated.com.google.common.collect.Lists.newLinkedList(Lists.java:237)
 at org.apache.iceberg.ManifestLists.read(ManifestLists.java:46) at 
org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127) at 
org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149) at 
org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer.java:343)
 at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:163) at 
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276) 
at org.apache.ice
 berg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404) at 
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213) at 
org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:197) at 
org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189) at 
org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) at 
com.snapchat.transformer.IcebergWriterImpl.appendData(IcebergWriterImpl.java:110)
 at 
com.snapchat.transformer.TransformerStreamingWorker.acknowledge(TransformerStreamingWorker.java:393)
 at 
com.snapchat.transformer.TransformerStreamingWorker.transformAndPublish(TransformerStreamingWorker.java:244)
 at 
com.snapchat.transformer.TransformerStreamingWorker.processBatch(TransformerStreamingWorker.java:208)
 at 
com.snapchat.transformer.TransformerStreamingWorker.processExactoBatchRDD(TransformerStreamingWorker.java:181)
 at 
com.snapchat.transformer.TransformerStreamingWorker.lambda$execute$d121240d$1(TransformerStreamingWorker.java:162)
 at org.apache.spark.streaming.ap
 i.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) at 
org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2$adapted(JavaDStreamLike.scala:280)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
 at 
org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
scala.util.Try$.apply(Try.scala:213) at 
org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:257)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at 
org.apache.spark.stre
 aming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)Caused by: 
java.lang.ClassNotFoundException: org.apache.avro.InvalidAvroMagicException at 
java.net.URLClassLoader.findClass(URLClassLoader.java:382) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:418) at 
java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 more
   
   2) The log shows that table already exists for the iceberg table, however I 
am unable to see the metadata file in gcs? I am running the spark job from the 
dataproc cluster, where can i see the metadata file?
   
   #####################
   Iceberg version: 0.11
   spark version 3.0
   #####################
   
       public void appendData(List<FileMetadata> publishedFiles, Schema 
icebergSchema) {
           TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER, 
jobConfig.streamName());
           // PartitionSpec partitionSpec = 
IcebergInternalFields.getPartitionSpec(tableSchema);
           HadoopTables tables = new HadoopTables(new Configuration());
   
          
           PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
                   .build();
   
           Table table = null;
           if (tables.exists(tableIdentifier.name())) {
               table = tables.load(tableIdentifier.name());
           } else {
               table = tables.create(
                       icebergSchema,
                       partitionSpec,
                       tableIdentifier.name());
           }
           AppendFiles appendFiles = table.newAppend();
           for (FileMetadata fileMetadata : publishedFiles) {
   
               appendFiles.appendFile(DataFiles.builder(partitionSpec)
                       .withPath(fileMetadata.getFilename())
                       .withFileSizeInBytes(fileMetadata.getFileSize())
                       .withRecordCount(fileMetadata.getRowCount())
                       .withFormat(FileFormat.AVRO)
                       .build());
           }
           appendFiles.commit();
       }


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to