Anyone any opinion on this? A link to the PR: https://github.com/apache/spark/pull/26644
Cheers, Fokko Op vr 20 dec. 2019 om 16:00 schreef Driesprong, Fokko <fo...@driesprong.frl >: > Folks, > > I've opened a PR a while ago with a PR to merge the possibility to merge > a custom data type, into a native data type. This is something new because > of the introduction of Delta. > > To have some background, I'm having a DataSet that has fields of the type > XMLGregorianCalendarType. I don't care about this type and would like to > convert this to a standard data type. Mainly because, if I'm reading the > data again using another job, it needs to have the customer data type being > registered, which is not possible in the SQL API. The magic bit here is > that I'm overriding the jsonValue to lose the information about the custom > data type. In this case, you have to make sure that it is serialized as the > normal timestamp. > > Before Delta, when appending to the table, everything would go fine > because it would not check compatibility on write. Now with Delta, things > are different. When writing, it will check if the two structures can be > merged: > > OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; > support was removed in 8.0 > Warning: Ignoring non-spark config property: > eventLog.rolloverIntervalSeconds=3600 > Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed > to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge > incompatible data types TimestampType and > org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;; > at > com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685) > at > com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) > at > com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674) > at > com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750) > at > com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63) > at > com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50) > at > com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90) > at > com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119) > at > com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93) > at > com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405) > at > com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230) > at > com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18) > at > com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272) > at > com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18) > at > com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386) > at > com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18) > at > com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55) > at > com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98) > at > com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67) > at > com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67) > at > com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342) > at > com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45) > at > com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108) > at > com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45) > at > com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) > at > org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) > at > org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240) > at > org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170) > at > org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710) > at > org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508) > at > org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483) > at > org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430) > at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49) > at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69) > at com.ahold.IngestFild$.main(IngestFild.scala:31) > at com.ahold.IngestFild.main(IngestFild.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at org.apache.spark.deploy.SparkSubmit.org > $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) > at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) > at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) > at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > Currently, the merge function does not support UDT's. Therefore I've > extended the rules. Last few weeks it was quiet at the PR. Is this > something that we can merge into Spark? I would like to get your opinion on > this. > > Cheers, Fokko >