devinbost commented on issue #4721: Flink pulsar sink NotSerializableException
URL: https://github.com/apache/pulsar/issues/4721#issuecomment-561965357
 
 
   Does anyone know of a workaround? I'm stuck on this exact issue. 
   
   You should be able to reproduce it with this example:
   
   
   ```
   import org.apache.flink.api.common.functions.AggregateFunction;
   import org.apache.flink.api.common.functions.FoldFunction;
   import org.apache.flink.api.common.functions.MapFunction;
   import org.apache.flink.api.common.serialization.SimpleStringSchema;
   import org.apache.flink.api.common.typeinfo.TypeHint;
   import org.apache.flink.api.common.typeinfo.TypeInformation;
   import org.apache.flink.api.java.tuple.Tuple2;
   import org.apache.flink.streaming.api.datastream.DataStream;
   import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
   import org.apache.flink.streaming.api.functions.source.SourceFunction;
   import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
   import org.apache.flink.streaming.api.windowing.time.Time;
   import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
   import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
   import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
   import org.json.simple.JSONObject;
   import org.json.simple.parser.JSONParser;
   import org.json.simple.parser.ParseException;
   
   import static java.nio.charset.StandardCharsets.UTF_8;
   
   public class StreamingJob {
   
        public static Tuple2<String,String> mapToTuple(String incomingMessage) 
throws ParseException {
                JSONObject incomingObj = (JSONObject) new 
JSONParser().parse(incomingMessage);
                JSONObject correlationIdJson = (JSONObject) 
incomingObj.get("correlationId");
                String correlationId = "";
                if(correlationIdJson != null){
                        correlationId = correlationIdJson.toString();
                } // Put in try/catch to throw exception if correlationIdJson 
== null
                Tuple2 msgEnvelope = new Tuple2(correlationId, 
incomingObj.toString());
                return msgEnvelope;
        }
        private static class JsonConcatenator
                        implements AggregateFunction<Tuple2<String, String>, 
Tuple2<String, String>, String> {
                @Override
                public Tuple2<String, String> createAccumulator() {
                        return new Tuple2<String, String>("","");
                }
   
                @Override
                public Tuple2<String, String> add(Tuple2<String, String> value, 
Tuple2<String, String> accumulator) {
                        return new Tuple2<>(value.f0, accumulator.f1 + ", " + 
value.f1);
                }
   
                @Override
                public String getResult(Tuple2<String, String> accumulator) {
                        return "[" + accumulator.f1 + "]";
                }
   
                @Override
                public Tuple2<String, String> merge(Tuple2<String, String> a, 
Tuple2<String, String> b) {
                        return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
                }
        }
   
        public static void main(String[] args) throws Exception {
                // set up the streaming execution environment
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
                String SERVICE_URL = "pulsar://localhost:6650";
                String INPUT_TOPIC = 
"persistent://public/default/test-flink-jaeger-spanner-input";
                String SUBSCRIPTION_NAME = "test-jaeger-spanner";
                String OUTPUT_TOPIC = 
"persistent://public/default/test-flink-jaeger-spanner-output";
   
                PulsarSourceBuilder<String> builder = PulsarSourceBuilder
                                .builder(new SimpleStringSchema())
                                .serviceUrl(SERVICE_URL)
                                .topic(INPUT_TOPIC)
                                .subscriptionName(SUBSCRIPTION_NAME);
                SourceFunction<String> src = builder.build();
                DataStream<String> dataStream = env.addSource(src);
   
                DataStream<String> combinedEnvelopes = dataStream
                                .map(new MapFunction<String, Tuple2<String, 
String>>() {
                                        @Override
                                        public Tuple2 map(String 
incomingMessage) throws Exception {
                                                return 
mapToTuple(incomingMessage);
                                        }
                                })
                                .keyBy(0)
                                
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
                                .aggregate(new JsonConcatenator())
                                .returns(String.class);
   
                combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
                                SERVICE_URL,
                                OUTPUT_TOPIC,
                                new AuthenticationDisabled(), // probably need 
to fix //  AuthenticationTls()
                                combinedData -> 
combinedData.toString().getBytes(UTF_8),
                                combinedData -> null)
                );
   
                // execute program
                env.execute("Flink Streaming Java API Skeleton");
        }
   }
   ```
   
   with this POM file:
   
   ```
   <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
        <modelVersion>4.0.0</modelVersion>
   
        <groupId>com.example</groupId>
        <artifactId>flink-poc</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
   
        <name>Flink Quickstart Job</name>
        <url>http://www.myorganization.org</url>
   
        <properties>
                
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
                <flink.version>1.9.0</flink.version>
                <java.version>1.8</java.version>
                <scala.binary.version>2.11</scala.binary.version>
                <maven.compiler.source>${java.version}</maven.compiler.source>
                <maven.compiler.target>${java.version}</maven.compiler.target>
                <pulsar.version>2.4.0</pulsar.version>
        </properties>
   
        <repositories>
                <repository>
                        <id>apache.snapshots</id>
                        <name>Apache Development Snapshot Repository</name>
                        
<url>https://repository.apache.org/content/repositories/snapshots/</url>
                        <releases>
                                <enabled>false</enabled>
                        </releases>
                        <snapshots>
                                <enabled>true</enabled>
                        </snapshots>
                </repository>
        </repositories>
   
        <dependencies>
                <!-- 
https://mvnrepository.com/artifact/io.jaegertracing/jaeger-client -->
                <dependency>
                        <groupId>io.jaegertracing</groupId>
                        <artifactId>jaeger-client</artifactId>
                        <version>1.0.0</version>
                </dependency>
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-functions-api</artifactId>
                        <version>${pulsar.version}</version>
                </dependency>
   
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-io-core</artifactId>
                        <version>${pulsar.version}</version>
                </dependency>
   
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-client</artifactId>
                        <version>2.4.0</version> <!-- What's the latest stable 
version???-->
                </dependency>
   
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-client-admin</artifactId>
                        <version>2.4.0</version> 
                </dependency>
                <!-- Apache Flink dependencies -->
                <!-- These dependencies are provided, because they should not 
be packaged into the JAR file. -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-java</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${flink.version}</version>
                        <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>com.googlecode.json-simple</groupId>
                        <artifactId>json-simple</artifactId>
                        <version>1.1</version>
                </dependency>
                <dependency>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                        <version>1.18.6</version>
                        <scope>provided</scope>
                </dependency>
                <dependency>
                        <groupId>junit</groupId>
                        <artifactId>junit</artifactId>
                        <version>4.11</version>
                        <scope>test</scope>
                </dependency>
   
                <!-- 
https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-flink -->
                <dependency>
                        <groupId>org.apache.pulsar</groupId>
                        <artifactId>pulsar-flink</artifactId>
                        <version>2.4.0</version>
                </dependency>
                <dependency>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                        <version>1.7.7</version>
                        <scope>runtime</scope>
                </dependency>
                <dependency>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                        <version>1.2.17</version>
                        <scope>runtime</scope>
                </dependency>
        </dependencies>
   
        <build>
                <plugins>
   
                        <!-- Java Compiler -->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-compiler-plugin</artifactId>
                                <version>3.1</version>
                                <configuration>
                                        <source>${java.version}</source>
                                        <target>${java.version}</target>
                                </configuration>
                        </plugin>
   
                        <!-- We use the maven-shade plugin to create a fat jar 
that contains all necessary dependencies. -->
                        <!-- Change the value of <mainClass>...</mainClass> if 
your program entry point changes. -->
                        <plugin>
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-shade-plugin</artifactId>
                                <version>3.0.0</version>
                                <executions>
                                        <!-- Run shade goal on package phase -->
                                        <execution>
                                                <phase>package</phase>
                                                <goals>
                                                        <goal>shade</goal>
                                                </goals>
                                                <configuration>
                                                        <artifactSet>
                                                                <excludes>
                                                                        
<exclude>org.apache.flink:force-shading</exclude>
                                                                        
<exclude>com.google.code.findbugs:jsr305</exclude>
                                                                        
<exclude>org.slf4j:*</exclude>
                                                                        
<exclude>log4j:*</exclude>
                                                                </excludes>
                                                        </artifactSet>
                                                        <filters>
                                                                <filter>
                                                                        <!-- Do 
not copy the signatures in the META-INF folder.
                                                                        
Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                                                        
<artifact>*:*</artifact>
                                                                        
<excludes>
                                                                                
<exclude>META-INF/*.SF</exclude>
                                                                                
<exclude>META-INF/*.DSA</exclude>
                                                                                
<exclude>META-INF/*.RSA</exclude>
                                                                        
</excludes>
                                                                </filter>
                                                        </filters>
                                                        <transformers>
                                                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                                                        
<mainClass>com.overstock.dataeng.jaeger.spanner.StreamingJob</mainClass>
                                                                </transformer>
                                                        </transformers>
                                                </configuration>
                                        </execution>
                                </executions>
                        </plugin>
                </plugins>
   
                <pluginManagement>
                        <plugins>
                                <plugin>
                                        
<groupId>org.apache.maven.plugins</groupId>
                                        
<artifactId>maven-assembly-plugin</artifactId>
                                        <version>3.1.1</version>
                                        <configuration>
                                                <descriptorRefs>
                                                        
<descriptorRef>jar-with-dependencies</descriptorRef>
                                                </descriptorRefs>
                                        </configuration>
                                        <executions>
                                                <execution>
                                                        <id>assemble-all</id>
                                                        <phase>package</phase>
                                                        <goals>
                                                                
<goal>single</goal>
                                                        </goals>
                                                </execution>
                                        </executions>
                                </plugin>
                                <!-- This improves the out-of-the-box 
experience in Eclipse by resolving some warnings. -->
                                <plugin>
                                        <groupId>org.eclipse.m2e</groupId>
                                        
<artifactId>lifecycle-mapping</artifactId>
                                        <version>1.0.0</version>
                                        <configuration>
                                                <lifecycleMappingMetadata>
                                                        <pluginExecutions>
                                                                
<pluginExecution>
                                                                        
<pluginExecutionFilter>
                                                                                
<groupId>org.apache.maven.plugins</groupId>
                                                                                
<artifactId>maven-shade-plugin</artifactId>
                                                                                
<versionRange>[3.0.0,)</versionRange>
                                                                                
<goals>
                                                                                
        <goal>shade</goal>
                                                                                
</goals>
                                                                        
</pluginExecutionFilter>
                                                                        <action>
                                                                                
<ignore/>
                                                                        
</action>
                                                                
</pluginExecution>
                                                                
<pluginExecution>
                                                                        
<pluginExecutionFilter>
                                                                                
<groupId>org.apache.maven.plugins</groupId>
                                                                                
<artifactId>maven-compiler-plugin</artifactId>
                                                                                
<versionRange>[3.1,)</versionRange>
                                                                                
<goals>
                                                                                
        <goal>testCompile</goal>
                                                                                
        <goal>compile</goal>
                                                                                
</goals>
                                                                        
</pluginExecutionFilter>
                                                                        <action>
                                                                                
<ignore/>
                                                                        
</action>
                                                                
</pluginExecution>
                                                        </pluginExecutions>
                                                </lifecycleMappingMetadata>
                                        </configuration>
                                </plugin>
                        </plugins>
                </pluginManagement>
        </build>
   
        <!-- This profile helps to make things run out of the box in IntelliJ 
-->
        <!-- Its adds Flink's core classes to the runtime class path. -->
        <!-- Otherwise they are missing in IntelliJ, because the dependency is 
'provided' -->
        <profiles>
                <profile>
                        <id>add-dependencies-for-IDEA</id>
   
                        <activation>
                                <property>
                                        <name>idea.version</name>
                                </property>
                        </activation>
   
                        <dependencies>
                                <dependency>
                                        <groupId>org.apache.flink</groupId>
                                        <artifactId>flink-java</artifactId>
                                        <version>${flink.version}</version>
                                        <scope>compile</scope>
                                </dependency>
                                <dependency>
                                        <groupId>org.apache.flink</groupId>
                                        
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                                        <version>${flink.version}</version>
                                        <scope>compile</scope>
                                </dependency>
                        </dependencies>
                </profile>
        </profiles>
   
   </project>
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to