boneanxs commented on pull request #4954:
URL: https://github.com/apache/hudi/pull/4954#issuecomment-1061481475
Hi guys, I also met this exception when enable async clustering in a
HoodieSparkStreaming job, not the same as the stacktrace this issue hit,
following is the stacktrace I met,
```java
ERROR AsyncClusteringService: Clustering executor failed
java.util.concurrent.CompletionException: org.apache.spark.SparkException: Task
not serializable
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
at
java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2467)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:912)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:911)
at
org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:103)
at
org.apache.spark.api.java.JavaRDDLike.mapPartitionsWithIndex$(JavaRDDLike.scala:99)
at
org.apache.spark.api.java.AbstractJavaRDDLike.mapPartitionsWithIndex(JavaRDDLike.scala:45)
at
org.apache.hudi.table.action.commit.SparkBulkInsertHelper.bulkInsert(SparkBulkInsertHelper.java:115)
at
org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy.performClusteringWithRecordsRDD(SparkSortAndSizeExecutionStrategy.java:68)
at
org.apache.hudi.client.clustering.run.strategy.MultipleSparkJobExecutionStrategy.lambda$runClusteringForGroupAsync$4(MultipleSparkJobExecutionStrategy.java:175)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 5 more
Caused by: java.util.ConcurrentModificationException
at
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.HashSet.writeObject(HashSet.java:287)
at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at org.apache.spark.serializer.JavaSerializerInstance
```
From my perspective, it might be `TypedProperties#keys` is not thread safe,
and another thread is trying to change this HashSet(like `put` or `putall` from
TypedProperties) while spark is trying to iterate it to serialize it at the
same time. `TypedProperties` could be used by HoodieTable's
config(HoodieWriteConfig), so this pr could fix it by avoiding HoodieTable to
be serialized.
But when I'm trying to solve it with the same way as this pr used,
Unfortunately found there could be a lot changes to avoid serializing
HoodieTable (Change construction methods of `BulkInsertMapFunction`,
`SparkLazyInsertIterable`, `HoodieLazyInsertIterable`, and many kinds of
`WriteHandler`), I'm afraid this could be a huge change.
Another solution is to make `TypedProperties` thread-safe, there are two
ways to make `TypedProperties` thread-safe
1. Only change keys to be `Collections.newSetFromMap(new
ConcurrentHashMap<>())`, this could avoid `ConcurrentModificationException`,
but `TypedProperties` is not really thread-safe, as modify attribute `keys` and
save key-value pair is divided into two steps, for example,
```java
// Synchronized is not work actually, because get methods are not
synchronized
public synchronized Object put(Object key, Object value) {
keys.remove(key);
keys.add(key);
// This could cause key is added in keys, but its value is not saved by
TypedProperties
return super.put(key, value);
}
```
2. Not let `TypedProperties` to extend `Properties`, use an internal
`ConcurrentHashMap` to save key and values, this could make `TypedProperties`
to be real thread-safe.
```java
public class TypedProperties implements Serializable {
private final ConcurrentHashMap<Object, Object> props = new
ConcurrentHashMap<Object, Object>();
public TypedProperties() {
}
public TypedProperties(Properties defaults) {
if (Objects.nonNull(defaults)) {
for (String key : defaults.stringPropertyNames()) {
put(key, defaults.getProperty(key));
}
}
}
public Enumeration<Object> keys() {
return Collections.enumeration(props.keySet());
}
...
```
Do you guys have any other suggestions? Thanks~
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]