thank you, I will try to use coalesce as my data did not contain any nulls. -Sandeep
On Thu, Aug 22, 2019 at 3:34 PM Ryan Blue <rb...@netflix.com> wrote: > Datasets work fine with Iceberg. The problem was that your dataset’s > schema was incompatible with the table schema you created because your > dataset could contain null values for required fields in your table. > > All you should need to do is guarantee that those fields do not contain > null values by using coalesce(nullable_col, "default-val") or filtering > out the rows with nulls. > > On Thu, Aug 22, 2019 at 3:04 PM Sandeep Sagar <sandeep.sa...@meltwater.com> > wrote: > >> Hi Ryan, >> I changed my code to use DataFrame instead of Dataset and it worked as I >> was able to specify a Spark Schema. >> >> Does it mean we cannot use Datasets with Iceberg or I am using it >> incorrectly? >> >> including the working code below: >> >> thanks >> Sandeep >> >> List<Row> rows = new ArrayList<Row>(tweets.size()); >> >> tweets.stream().forEach( c -> { >> rows.add(c.getSparkRow()); >> }); >> >> Dataset<Row> tweetRows= >> dlS3Connector.getSparkSession().createDataFrame(rows, >> TweetItem.getSparkSchema()); >> >> tweetRows.write() >> .format("iceberg") >> .mode("append") >> .save(TweetItem.getTableLocation()); >> >> >> On Thu, Aug 22, 2019 at 1:07 PM Sandeep Sagar < >> sandeep.sa...@meltwater.com> wrote: >> >>> This is how I created the dataset: >>> tweet1, tweet2 are 2 objects I created by setting every member attribute. >>> >>> List<TweetItem> tweets= Arrays.asList(tweet1, tweet2); >>> >>> Dataset<TweetItem> dsTweets = >>> dlS3Connector.getSparkSession().createDataset(tweets, >>> Encoders.bean(TweetItem.class)); >>> >>> The "columns" shows up if I do PrintSchema on dsTweets as >>> >>> dsTweets.printSchema(); >>> >>> root >>> |-- columns: struct (nullable = true) >>> |-- created_at: string (nullable = true) >>> |-- encoder: struct (nullable = true) >>> |-- id: long (nullable = true) >>> |-- id_str: string (nullable = true) >>> |-- lang: string (nullable = true) >>> |-- mwId: string (nullable = true) >>> |-- mwVersion: long (nullable = true) >>> |-- partitionSpec: struct (nullable = true) >>> |-- schema: struct (nullable = true) >>> | |-- aliases: map (nullable = true) >>> | | |-- key: string >>> | | |-- value: integer (valueContainsNull = true) >>> |-- tableLocation: string (nullable = true) >>> |-- text: string (nullable = true) >>> >>> So it seems to be inserted by createDataset API. >>> >>> Hope this clarifies. >>> >>> thanks >>> >>> Sandeep >>> >>> >>> On Thu, Aug 22, 2019 at 12:51 PM Ryan Blue <rb...@netflix.com> wrote: >>> >>>> I don't know how you produced the dataset so I can't tell you how you >>>> had a "columns" column. But I can help recommend a solution to the >>>> required/optional problem. >>>> >>>> I think what's happening is that Spark doesn't know whether your >>>> dataset contains nulls for those columns or not. To ensure that they don't >>>> contain nulls, you should be able to filter out rows where those are null. >>>> Alternatively, you can add a coalesce with the column and a non-null value >>>> to set the default if the column contains a null. Then Spark should detect >>>> that the column can't contain nulls and should be able to write. >>>> >>>> On Thu, Aug 22, 2019 at 12:39 PM Sandeep Sagar < >>>> sandeep.sa...@meltwater.com> wrote: >>>> >>>>> I tried what you suggested and it went past that, but ran into the >>>>> following - >>>>> java.lang.IllegalArgumentException: Cannot write incompatible >>>>> dataframe to table with schema: >>>>> table { >>>>> 1: mwId: required string >>>>> 2: mwVersion: required long >>>>> 3: id: required long >>>>> 4: id_str: required string >>>>> 5: text: optional string >>>>> 6: created_at: optional string >>>>> 7: lang: optional string >>>>> } >>>>> Problems: >>>>> * mwId should be required, but is optional >>>>> * mwVersion should be required, but is optional >>>>> * id should be required, but is optional >>>>> * id_str should be required, but is optional >>>>> >>>>> So, If I create a Dataset by using a select, it marks every column by >>>>> default as optional and I don't see in the java doc as to how to reflect >>>>> the schema in this. >>>>> Also, when I was debugging the earlier issue, I noticed that it is >>>>> querying the fieldStruct for a field name "columns", which is present in >>>>> the schema but not in my incoming dataset and hence the NPE. Why would >>>>> incoming data have that? >>>>> >>>>> thanks >>>>> Sandeep >>>>> >>>>> On Thu, Aug 22, 2019 at 12:13 PM Ryan Blue <rb...@netflix.com.invalid> >>>>> wrote: >>>>> >>>>>> Hi Sandeep, >>>>>> >>>>>> It looks like the problem is that your schema doesn’t match. There >>>>>> are columns in your dataset that don’t appear in your table schema, like >>>>>> tableLocation. When Iceberg tries to match up the dataset’s schema with >>>>>> the >>>>>> table’s schema, it can’t find those fields by name and hits an error. >>>>>> >>>>>> I think if you add a select, it should work: >>>>>> >>>>>> myDS.select("mwId", "mwVersion", "id", "id_str", "text", "created_at", >>>>>> "lang").write() >>>>>> .format("iceberg") >>>>>> .mode("append") >>>>>> .save(getTableLocation()) >>>>>> >>>>>> Iceberg has code to catch the schema mismatch and throw an exception, >>>>>> but it looks like it runs after the point where this is failing. We >>>>>> should >>>>>> fix Iceberg to correctly assign IDs so you get a better error message. >>>>>> I’ll >>>>>> open an issue for this. >>>>>> >>>>>> Another fix is also coming in the next Spark release. In 2.4, Spark >>>>>> doesn’t validate the schema of a dataset when writing. That is fixed in >>>>>> master and will be in the 3.0 release. >>>>>> >>>>>> rb >>>>>> >>>>>> On Thu, Aug 22, 2019 at 11:39 AM Sandeep Sagar < >>>>>> sandeep.sa...@meltwater.com> wrote: >>>>>> >>>>>>> Hello, >>>>>>> Newbie here.Need help to figure out the issue here. >>>>>>> Doing a simple local spark Save using Iceberg with S3. >>>>>>> I see that my metadata folder was created in S3, so my schema/table >>>>>>> creation was successful. >>>>>>> When I try to run a Spark Write, I get a NullPointerException at >>>>>>> >>>>>>> java.lang.NullPointerException >>>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:77) >>>>>>> at org.apache.iceberg.types.ReassignIds.field(ReassignIds.java:28) >>>>>>> at >>>>>>> org.apache.iceberg.types.TypeUtil$VisitFieldFuture.get(TypeUtil.java:331) >>>>>>> at >>>>>>> com.google.common.collect.Iterators$6.transform(Iterators.java:783) >>>>>>> at >>>>>>> com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) >>>>>>> at com.google.common.collect.Iterators.addAll(Iterators.java:356) >>>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:143) >>>>>>> at com.google.common.collect.Lists.newArrayList(Lists.java:130) >>>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:55) >>>>>>> at org.apache.iceberg.types.ReassignIds.struct(ReassignIds.java:28) >>>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:364) >>>>>>> at >>>>>>> org.apache.iceberg.types.TypeUtil$VisitFuture.get(TypeUtil.java:316) >>>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:40) >>>>>>> at org.apache.iceberg.types.ReassignIds.schema(ReassignIds.java:28) >>>>>>> at org.apache.iceberg.types.TypeUtil.visit(TypeUtil.java:336) >>>>>>> at org.apache.iceberg.types.TypeUtil.reassignIds(TypeUtil.java:137) >>>>>>> at >>>>>>> org.apache.iceberg.spark.SparkSchemaUtil.convert(SparkSchemaUtil.java:163) >>>>>>> at >>>>>>> org.apache.iceberg.spark.source.IcebergSource.validateWriteSchema(IcebergSource.java:146) >>>>>>> at >>>>>>> org.apache.iceberg.spark.source.IcebergSource.createWriter(IcebergSource.java:76) >>>>>>> at >>>>>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:255) >>>>>>> at >>>>>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) >>>>>>> >>>>>>> Maybe I am making a mistake in Schema Creation? >>>>>>> >>>>>>> new Schema( >>>>>>> required(1, "mwId", Types.StringType.get()), >>>>>>> required(2, "mwVersion", LongType.get()), >>>>>>> required(3, "id", Types.LongType.get()), >>>>>>> required(4, "id_str", Types.StringType.get()), >>>>>>> optional(5, "text", Types.StringType.get()), >>>>>>> optional(6, "created_at", Types.StringType.get()), >>>>>>> optional(7, "lang", Types.StringType.get()) >>>>>>> ); >>>>>>> >>>>>>> PartitionSpec I used was PartitionSpec.unpartitioned(); >>>>>>> >>>>>>> The write code I used was: >>>>>>> >>>>>>> Dataset<TweetItem> myDS; >>>>>>> >>>>>>> ...... (populate myDS) >>>>>>> >>>>>>> myDS.write() >>>>>>> .format("iceberg") >>>>>>> .mode("append") >>>>>>> .save(getTableLocation()); >>>>>>> >>>>>>> >>>>>>> If I do a PrintSchema, I get: >>>>>>> >>>>>>> root >>>>>>> |-- columns: struct (nullable = true) >>>>>>> |-- created_at: string (nullable = true) >>>>>>> |-- encoder: struct (nullable = true) >>>>>>> |-- id: long (nullable = true) >>>>>>> |-- id_str: string (nullable = true) >>>>>>> |-- lang: string (nullable = true) >>>>>>> |-- mwId: string (nullable = true) >>>>>>> |-- mwVersion: long (nullable = true) >>>>>>> |-- partitionSpec: struct (nullable = true) >>>>>>> |-- schema: struct (nullable = true) >>>>>>> | |-- aliases: map (nullable = true) >>>>>>> | | |-- key: string >>>>>>> | | |-- value: integer (valueContainsNull = true) >>>>>>> |-- tableLocation: string (nullable = true) >>>>>>> |-- text: string (nullable = true) >>>>>>> >>>>>>> Appreciate your help. >>>>>>> >>>>>>> regards >>>>>>> Sandeep >>>>>>> >>>>>>> The information contained in this email may be confidential. It has >>>>>>> been sent for the sole use of the intended recipient(s). If the reader >>>>>>> of >>>>>>> this email is not an intended recipient, you are hereby notified that >>>>>>> any >>>>>>> unauthorized review, use, disclosure, dissemination, distribution, or >>>>>>> copying of this message is strictly prohibited. If you have received >>>>>>> this >>>>>>> email in error, please notify the sender immediately and destroy all >>>>>>> copies >>>>>>> of the message. >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>>> The information contained in this email may be confidential. It has >>>>> been sent for the sole use of the intended recipient(s). If the reader of >>>>> this email is not an intended recipient, you are hereby notified that any >>>>> unauthorized review, use, disclosure, dissemination, distribution, or >>>>> copying of this message is strictly prohibited. If you have received this >>>>> email in error, please notify the sender immediately and destroy all >>>>> copies >>>>> of the message. >>>> >>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >> The information contained in this email may be confidential. It has been >> sent for the sole use of the intended recipient(s). If the reader of this >> email is not an intended recipient, you are hereby notified that any >> unauthorized review, use, disclosure, dissemination, distribution, or >> copying of this message is strictly prohibited. If you have received this >> email in error, please notify the sender immediately and destroy all copies >> of the message. > > > > -- > Ryan Blue > Software Engineer > Netflix > -- The information contained in this email may be confidential. It has been sent for the sole use of the intended recipient(s). If the reader of this email is not an intended recipient, you are hereby notified that any unauthorized review, use, disclosure, dissemination, distribution, or copying of this message is strictly prohibited. If you have received this email in error, please notify the sender immediately and destroy all copies of the message.