Anyone any opinion on this? A link to the PR:
https://github.com/apache/spark/pull/26644

Cheers, Fokko


Op vr 20 dec. 2019 om 16:00 schreef Driesprong, Fokko <fo...@driesprong.frl
>:

> Folks,
>
> I've opened a PR a while ago with a PR to merge the possibility to merge
> a custom data type, into a native data type. This is something new because
> of the introduction of Delta.
>
> To have some background, I'm having a DataSet that has fields of the type
> XMLGregorianCalendarType. I don't care about this type and would like to
> convert this to a standard data type. Mainly because, if I'm reading the
> data again using another job, it needs to have the customer data type being
> registered, which is not possible in the SQL API. The magic bit here is
> that I'm overriding the jsonValue to lose the information about the custom
> data type. In this case, you have to make sure that it is serialized as the
> normal timestamp.
>
> Before Delta, when appending to the table, everything would go fine
> because it would not check compatibility on write. Now with Delta, things
> are different. When writing, it will check if the two structures can be
> merged:
>
> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m;
> support was removed in 8.0
> Warning: Ignoring non-spark config property:
> eventLog.rolloverIntervalSeconds=3600
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed
> to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge
> incompatible data types TimestampType and
> org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;;
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685)
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674)
> at
> com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750)
> at
> com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63)
> at
> com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50)
> at
> com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
> at
> com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405)
> at
> com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230)
> at
> com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
> at
> com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272)
> at
> com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
> at
> com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386)
> at
> com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
> at
> com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
> at
> com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
> at
> com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
> at
> com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
> at
> com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
> at
> com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
> at
> com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
> at
> org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
> at
> org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508)
> at
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483)
> at
> org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
> at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49)
> at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69)
> at com.ahold.IngestFild$.main(IngestFild.scala:31)
> at com.ahold.IngestFild.main(IngestFild.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Currently, the merge function does not support UDT's. Therefore I've
> extended the rules. Last few weeks it was quiet at the PR. Is this
> something that we can merge into Spark? I would like to get your opinion on
> this.
>
>  Cheers, Fokko
>

Reply via email to