ilhanadiyaman commented on pull request #12159:
URL: https://github.com/apache/druid/pull/12159#issuecomment-1023724261
Hi @JulianJaffePinterest, thank you for your great work. We consider using
this connector in the production, however, while testing this PR, we
encountered an error while writing the segment that includes `thetaSketch`.
We first read a segment directly from the deep storage by providing the
`reader.segments`.
| number | count | animal | __time | animalTheta
|
|--------|-------|---------|---------------|---------------------------------------------------|
| 32 | 1 | bear | 1515459660000 | [01 03 03 00 00 3A CC 93 5E 90 53
47 FF 46 AD 47] |
| 3 | 1 | bird | 1515466860000 | [01 03 03 00 00 3A CC 93 AA 00
F4 1D D3 FF F8 14] |
| 4 | 1 | tiger | 1515466860000 | [01 03 03 00 00 3A CC 93 28 32
FA 04 88 6A BA 4B] |
Then we updated the `bear` to `giraffe` and try to write back the segment
with `writer.metrics` and `writer.dimensions` provided.
| number | count | animal | __time | animalTheta
|
|--------|-------|---------|---------------|---------------------------------------------------|
| 32 | 1 | giraffe | 1515459660000 | [01 03 03 00 00 3A CC 93 91 0C
5B F9 33 1B E6 17] |
| 3 | 1 | bird | 1515466860000 | [01 03 03 00 00 3A CC 93 AA 00
F4 1D D3 FF F8 14] |
| 4 | 1 | tiger | 1515466860000 | [01 03 03 00 00 3A CC 93 28 32
FA 04 88 6A BA 4B] |
```java
writeOptions.put("writer.dimensions","[ \"animal\" ]");
writeOptions.put("writer.metrics","[ { \"type\": \"count\", \"name\":
\"count\" }, { \"type\": \"longSum\", \"name\": \"number\", \"fieldName\":
\"number\" }, { \"name\": \"animalTheta\", \"type\": \"thetaSketch\",
\"fieldName\": \"animal\", \"isInputThetaSketch\": true } ]");
```
Write operation fails with the error below:
`
org.apache.druid.java.util.common.ISE: Object is not of a type[class
org.apache.spark.unsafe.types.UTF8String] that can be deserialized to sketch.
at
org.apache.druid.query.aggregation.datasketches.theta.SketchHolder.deserialize(SketchHolder.java:223)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeComplexMetricSerde$1.extractValue(SketchMergeComplexMetricSerde.java:62)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeComplexMetricSerde$1.extractValue(SketchMergeComplexMetricSerde.java:50)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.segment.serde.ComplexMetricExtractor.extractValue(ComplexMetricExtractor.java:41)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.segment.incremental.IncrementalIndex$1IncrementalIndexInputRowColumnSelectorFactory$1.getObject(IncrementalIndex.java:184)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.query.aggregation.datasketches.theta.SketchAggregator.aggregate(SketchAggregator.java:54)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.segment.incremental.OnheapIncrementalIndex.doAggregate(OnheapIncrementalIndex.java:254)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.segment.incremental.OnheapIncrementalIndex.addToFacts(OnheapIncrementalIndex.java:167)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.segment.incremental.IncrementalIndex.add(IncrementalIndex.java:481)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.segment.incremental.IncrementalIndex.add(IncrementalIndex.java:462)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.spark.v2.writer.DruidDataWriter.write(DruidDataWriter.scala:171)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.druid.spark.v2.writer.DruidDataWriter.write(DruidDataWriter.scala:68)
~[druid-spark.jar:0.22.0-SNAPSHOT]
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
~[spark-sql_2.12-2.4.8.jar:2.4.8]
at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
~[spark-core_2.12-2.4.8.jar:2.4.8]
at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
~[spark-sql_2.12-2.4.8.jar:2.4.8]
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
~[spark-sql_2.12-2.4.8.jar:2.4.8]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
~[spark-core_2.12-2.4.8.jar:2.4.8]
at org.apache.spark.scheduler.Task.run(Task.scala:123)
~[spark-core_2.12-2.4.8.jar:2.4.8]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:414)
~[spark-core_2.12-2.4.8.jar:2.4.8]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
[spark-core_2.12-2.4.8.jar:2.4.8]
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
[spark-core_2.12-2.4.8.jar:2.4.8]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_271]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_271]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_271]
`
Tried to debug it but couldn't get anywhere. It seems like it tries to
deserialize `UTF8String` but we provide thetaSketches as `byte array`.
<img width="875" alt="Screenshot 2022-01-27 at 23 54 02"
src="https://user-images.githubusercontent.com/4068390/151456714-2e98c961-5988-4cab-8ffd-83ccba120af0.png">
Do you have any ideas how we can resolve this issue?
P.S. Azure Deep Storage implementation is working. We didn't encounter any
problem there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]