[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Haohui Mai reassigned FLINK-7398:
---------------------------------

    Assignee: Haohui Mai

> Table API operators/UDFs must not store Logger
> ----------------------------------------------
>
>                 Key: FLINK-7398
>                 URL: https://issues.apache.org/jira/browse/FLINK-7398
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Aljoscha Krettek
>            Assignee: Haohui Mai
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45:
>   val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37:  val 
> LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39:  val 
> LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36:  val LOG = 
> LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37:  
> val LOG = LoggerFactory.getLogger(this.getClass)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to