gfalcone opened a new issue, #29902:
URL: https://github.com/apache/beam/issues/29902
### What happened?
Hello !
I have a streaming job processing messages from Pub/Sub that does not work
anymore using Beam 2.52.0 with Flink Runner (in detached mode)
The pipeline works fine in Beam 2.51.0
Here is the code of the pipeline :
```java
package com.mycompany.reco_videocatalog;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VideoCatalogWriteJob {
private static final Logger logger =
LoggerFactory.getLogger(VideoCatalogWriteJob.class);
public static void main(String[] args) throws URISyntaxException,
UnsupportedEncodingException, IOException, GeneralSecurityException,
InterruptedException {
// Register our custom options interface and parse command
line arguments.
PipelineOptionsFactory.register(Options.class);
final Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Create the pipeline with the specified options.
final Pipeline p = Pipeline.create(options);
// Read input data
p
.apply("Read from Pub/Sub",
PubsubIO.readStrings().fromSubscription(options.getOplogPubsubSubscription()));
// Start pipeline execution.
p.run();
}
}
```
With the pom.xml :
```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany</groupId>
<artifactId>data-octopus</artifactId>
<version>1.5.2</version>
<name>data-octopus</name>
<description>Recommendation features computation batch and real-time
jobs</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jdk.version>1.8</jdk.version>
<maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
<org.slf4j.version>1.7.21</org.slf4j.version>
<ch.qos.logback.version>1.4.14</ch.qos.logback.version>
<commons-math3.version>3.6.1</commons-math3.version>
<guava.version>33.0.0-jre</guava.version>
<beam.version>2.52.0</beam.version>
<jackson.version>2.16.1</jackson.version>
<aerospike-client.version>5.3.0</aerospike-client.version>
<flink.artifact.name>beam-runners-flink-1.16</flink.artifact.name>
</properties>
<repositories>
<repository>
<id>artifact-registry</id>
<url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>central</id>
<name>Maven Repository</name>
<url>https://repo.maven.apache.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<distributionManagement>
<repository>
<id>artifact-registry</id>
<url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
</repository>
<snapshotRepository>
<id>artifact-registry</id>
<url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
</snapshotRepository>
</distributionManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${org.slf4j.version}</version>
</dependency>
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Adds a dependency on the Beam Google Cloud Platform IO module.
-->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>direct-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<!-- Makes the DirectRunner available when running a pipeline.
-->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-runner</id>
<!-- Makes the FlinkRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<!-- Please see the Flink Runner page for an up-to-date
list
of supported Flink versions and their artifact names:
https://beam.apache.org/documentation/runners/flink/ -->
<artifactId>${flink.artifact.name}</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
<executions>
<execution>
<id>template-videocatalog-dockerfile</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>deploy/videocatalog</outputDirectory>
<resources>
<resource>
<directory>deploy/videocatalog/template</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
</transformers>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>bundled</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<parallel>all</parallel>
<threadCount>4</threadCount>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>2.18.1</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.1</version>
<executions>
<execution>
<id>default-deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```
And the associated configuration :
```yaml
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
annotations:
meta.helm.sh/release-name: data-octopus.flink-pipelines
meta.helm.sh/release-namespace: flink-pipelines
rollme: Uv1n7
creationTimestamp: "2024-01-01T15:15:31Z"
finalizers:
- flinkdeployments.flink.apache.org/finalizer
generation: 5
labels:
app.kubernetes.io/managed-by: Helm
environment: staging
name: vc-realtime-ix7-staging
namespace: flink-pipelines
resourceVersion: "2211827076"
uid: 328c9b41-ce8a-4165-9c9e-80141c9eb16d
spec:
flinkConfiguration:
env.java.opts: -Dlog4j2.formatMsgNoLookups=true
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3a://flink/ha/videocatalog-realtime
kubernetes.jobmanager.entrypoint.args:
-Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
-Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
kubernetes.taskmanager.entrypoint.args:
-Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
-Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
metrics.reporter.dghttp.class:
org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.tags: app:videocatalog-realtime,env:staging
metrics.scope.jm: flink.jm
metrics.scope.jm.job: flink.jm.job
metrics.scope.operator: flink.operator
metrics.scope.task: flink.task
metrics.scope.tm: flink.tm
metrics.scope.tm.job: flink.tm.job
s3.endpoint: http://flink-minio-svc:9000
s3.path-style-access: "true"
state.backend.type: hashmap
state.checkpoint-storage: filesystem
state.checkpoints.dir:
s3a://flink/recommender/videocatalog/externalized-checkpoints
state.savepoints.dir: s3a://flink/recommender/videocatalog/savepoints
taskmanager.numberOfTaskSlots: "4"
web.timeout: "100000"
web.upload.dir: /opt/flink
flinkVersion: v1_16
image: quay.io/mycompany/data-octopus-flink:a64370c
imagePullPolicy: Always
ingress:
annotations:
external-dns.alpha.kubernetes.io/hostname:
vc-realtime-ix7-staging.mydomain.com
external-dns.alpha.kubernetes.io/target: ****
external-dns.alpha.kubernetes.io/ttl: "120"
nginx.ingress.kubernetes.io/whitelist-source-range: *****
className: nginx-priv
template: vc-realtime-ix7-staging.mydomain.com
job:
args:
-
--oplogPubsubSubscription=projects/my-company/subscriptions/oplog-low-latency.aerospike
- --runner=FlinkRunner
- --streaming=true
- --attachedMode=false
- --checkpointingInterval=60000
- --latencyTrackingInterval=60000
entryClass: com.mycompany.reco_videocatalog.VideoCatalogWriteJob
jarURI: local:///opt/flink/flink-web-upload/data-octopus-bundled.jar
parallelism: 8
state: running
upgradeMode: savepoint
jobManager:
replicas: 1
resource:
cpu: 0.5
memory: 2g
podTemplate:
apiVersion: v1
kind: Pod
spec:
containers:
- env:
- name: DATADOG_API_KEY
valueFrom:
secretKeyRef:
key: datadogApiKey
name: data-octopus
- name: GOOGLE_APPLICATION_CREDENTIALS
value: /var/secrets/google/google-credentials
- name: MINIO_ACCESS_KEY
valueFrom:
secretKeyRef:
key: minioAccessKey
name: data-octopus
- name: MINIO_SECRET_KEY
valueFrom:
secretKeyRef:
key: minioSecretKey
name: data-octopus
name: flink-main-container
volumeMounts:
- mountPath: /var/secrets/google
name: gcp-serviceaccount
- mountPath: /var/secrets/certs
name: mycompany-ca-cert
imagePullSecrets:
- name: mycompany-puller-pull-secret
volumes:
- name: gcp-serviceaccount
secret:
items:
- key: google-credentials
path: google-credentials
secretName: data-octopus
- name: mycompany-ca-cert
secret:
items:
- key: ca.crt
path: ca.crt
secretName: mycompany-ca-cert
serviceAccount: flink
taskManager:
resource:
cpu: 2
memory: 4g
```
Here is the screenshot from Google Cloud Console showing that messages are
never acked :

Thank you for your help :)
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [X] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]