davidradl commented on code in PR #5:
URL:
https://github.com/apache/flink-connector-http/pull/5#discussion_r2581407254
##########
flink-connector-http/src/main/java/org/apache/flink/connector/http/sink/httpclient/JavaNetSinkHttpClient.java:
##########
@@ -114,7 +118,7 @@ private SinkHttpClientResponse
prepareSinkHttpClientResponse(
for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();
-
+
HttpLogger.getHttpLogger(properties).logResponse(response.getResponse().get());
Review Comment:
It sounded reasonable to make these changes - but if I do this I get 13 unit
test failures, it seems that making this change introduces a serialization
issue. Part of the test failure output is:
org.apache.flink.table.api.ValidationException: Function class 'class
org.apache.flink.connector.http.table.lookup.AsyncHttpTableLookupFunction' is
not serializable. Make sure that the class is self-contained (i.e. no
references to outer classes) and all inner fields are serializable as well.
at
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:595)
at
org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:258)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:262)
at
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:197)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1133)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceITCaseTest.testLookupJoinOnRowType(HttpLookupTableSourceITCaseTest.java:597)
at java.base/java.lang.reflect.Method.invoke(Method.java:572)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
Caused by: org.apache.flink.api.common.InvalidProgramException:
org.apache.flink.connector.http.HttpLogger@ced37eef is not serializable. The
object probably contains or references non serializable fields.
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:592)
... 30 more
Caused by: java.io.NotSerializableException:
org.apache.flink.connector.http.HttpLogger
at
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:547)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:149)
... 36 more
[ERROR]
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceITCaseTest.testLookupJoinOnRowTypeAndRootColumn
Time elapsed: 0.675 s <<< ERROR!
org.apache.flink.table.api.ValidationException: Function class 'class
org.apache.flink.connector.http.table.lookup.AsyncHttpTableLookupFunction' is
not serializable. Make sure that the class is self-contained (i.e. no
references to outer classes) and all inner fields are serializable as well.
at
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:595)
at
org.apache.flink.table.functions.UserDefinedFunctionHelper.prepareInstance(UserDefinedFunctionHelper.java:258)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLookupJoin.createJoinTransformation(CommonExecLookupJoin.java:262)
at
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecLookupJoin.translateToPlanInternal(StreamExecLookupJoin.java:197)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
at
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:180)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1308)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1133)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at
org.apache.flink.connector.http.table.lookup.HttpLookupTableSourceITCaseTest.testLookupJoinOnRowTypeAndRootColumn(HttpLookupTableSourceITCaseTest.java:668)
at java.base/java.lang.reflect.Method.invoke(Method.java:572)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
Caused by: org.apache.flink.api.common.InvalidProgramException:
org.apache.flink.connector.http.HttpLogger@f6eb7b8d is not serializable. The
object probably contains or references non serializable fields.
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:170)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:138)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at
org.apache.flink.table.functions.UserDefinedFunctionHelper.cleanFunction(UserDefinedFunctionHelper.java:592)
... 30 more
Caused by: java.io.NotSerializableException:
org.apache.flink.connector.http.HttpLogger
at
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:345)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:547)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:149)
... 36 more
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]