[
https://issues.apache.org/jira/browse/FLINK-23978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704990#comment-17704990
]
Alexey Novakov commented on FLINK-23978:
----------------------------------------
I am not sure either. I have tried one more time without flink-adt library.
This time the exception is different:
{code:scala}
import io.findify.flink.api._
//import io.findify.flinkadt.api._
import org.apache.flink.api.common.typeinfo.TypeInformation
@main def wordCountExample =
val env = StreamExecutionEnvironment.getExecutionEnvironment
implicit val ti = TypeInformation.of(classOf[String])
val text = env.fromElements("To be, or not to be,--that is the question:--")
implicit val tiTuple = TypeInformation.of(classOf[(String, Int)])
text
.flatMap(_.toLowerCase.split("\\W+"))
.map(Tuple2(_, 1))
.keyBy(_._1)
.sum(1)
.print() env.execute("wordCount") {code}
{code:java}
[error] Exception in thread "main"
org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException:
Cannot reference field by position on GenericType<scala.Tuple2>Referencing a
field by position is supported on tuples, case classes, and arrays.
Additionally, you can select the 0th field of a primitive/basic type (e.g. int).
[error] at
org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:113)
[error] at
org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:41)
[error] at
io.findify.flink.api.KeyedStream.aggregate(KeyedStream.scala:423)
[error] at io.findify.flink.api.KeyedStream.sum(KeyedStream.scala:353)
[error] at
org.example.wordCount$package$.wordCountExample(wordCount.scala:25)
[error] at org.example.wordCountExample.main(wordCount.scala:7) {code}
It is kind of saying that the type is GenericType<scala.Tuple2>
> FieldAccessor has direct scala dependency
> -----------------------------------------
>
> Key: FLINK-23978
> URL: https://issues.apache.org/jira/browse/FLINK-23978
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0
>
>
> The FieldAccessor class in flink-streaming-java has a hard dependency on
> scala. It would be ideal if we could restrict this dependencies to
> flink-streaming-scala.
> We could move the SimpleProductFieldAccessor & RecursiveProductFieldAccessor
> to flink-streaming-scala, and load them in the FieldAccessorFactory via
> reflection.
> This is one of a few steps that would allow the Java Datastream API to be
> used without scala being on the classpath.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)