gfalcone opened a new issue, #29902:
URL: https://github.com/apache/beam/issues/29902

   ### What happened?
   
   Hello ! 
   
   I have a streaming job processing messages from Pub/Sub that does not work 
anymore using Beam 2.52.0 with Flink Runner (in detached mode)
   
   The pipeline works fine in Beam 2.51.0
   
   Here is the code of the pipeline : 
   
   ```java
   package com.mycompany.reco_videocatalog;
   
   import java.io.IOException;
   import java.io.UnsupportedEncodingException;
   import java.net.URISyntaxException;
   import java.security.GeneralSecurityException;
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
   import org.apache.beam.sdk.options.PipelineOptionsFactory;
   import org.apache.beam.sdk.transforms.DoFn;
   import org.apache.beam.sdk.transforms.ParDo;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   public class VideoCatalogWriteJob {
   
       private static final Logger logger = 
LoggerFactory.getLogger(VideoCatalogWriteJob.class);
   
   
       public static void main(String[] args) throws URISyntaxException, 
UnsupportedEncodingException, IOException, GeneralSecurityException,
              InterruptedException {
                  // Register our custom options interface and parse command 
line arguments.
                  PipelineOptionsFactory.register(Options.class);
   
                  final Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
   
   
                  // Create the pipeline with the specified options.
                  final Pipeline p = Pipeline.create(options);
   
                  // Read input data
                  p
                  .apply("Read from Pub/Sub", 
PubsubIO.readStrings().fromSubscription(options.getOplogPubsubSubscription()));
                  
                  // Start pipeline execution.
                  p.run();
       }
   }
   
   ```
   
   With the pom.xml : 
   
   ```xml
   <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
       <modelVersion>4.0.0</modelVersion>
       <groupId>com.mycompany</groupId>
       <artifactId>data-octopus</artifactId>
       <version>1.5.2</version>
       <name>data-octopus</name>
       <description>Recommendation features computation batch and real-time 
jobs</description>
   
       <properties>
           <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
           <jdk.version>1.8</jdk.version>
   
           <maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>
   
           <org.slf4j.version>1.7.21</org.slf4j.version>
           <ch.qos.logback.version>1.4.14</ch.qos.logback.version>
           <commons-math3.version>3.6.1</commons-math3.version>
           <guava.version>33.0.0-jre</guava.version>
           <beam.version>2.52.0</beam.version>
           <jackson.version>2.16.1</jackson.version>
           <aerospike-client.version>5.3.0</aerospike-client.version>
           <flink.artifact.name>beam-runners-flink-1.16</flink.artifact.name>
       </properties>
   
       <repositories>
          <repository>
               <id>artifact-registry</id>
               <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts</url>
               <releases>
                   <enabled>true</enabled>
               </releases>
               <snapshots>
                   <enabled>true</enabled>
                </snapshots>
           </repository>
           <repository>
               <id>central</id>
               <name>Maven Repository</name>
               <url>https://repo.maven.apache.org/maven2</url>
               <snapshots>
                   <enabled>false</enabled>
               </snapshots>
           </repository>
       </repositories>
       <distributionManagement>
           <repository>
               <id>artifact-registry</id>
               <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
           </repository>
           <snapshotRepository>
               <id>artifact-registry</id>
               <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
           </snapshotRepository>
       </distributionManagement>
   
       <dependencies>
           <dependency>
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-api</artifactId>
               <version>${org.slf4j.version}</version>
           </dependency>
   
           <!-- Adds a dependency on the Beam SDK. -->
           <dependency>
               <groupId>org.apache.beam</groupId>
               <artifactId>beam-sdks-java-core</artifactId>
               <version>${beam.version}</version>
           </dependency>
           <!-- Adds a dependency on the Beam Google Cloud Platform IO module. 
-->
           <dependency>
               <groupId>org.apache.beam</groupId>
               <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
               <version>${beam.version}</version>
           </dependency>
       </dependencies>
   
       <profiles>
           <profile>
               <id>direct-runner</id>
               <activation>
                   <activeByDefault>true</activeByDefault>
               </activation>
               <!-- Makes the DirectRunner available when running a pipeline. 
-->
               <dependencies>
                   <dependency>
                       <groupId>org.apache.beam</groupId>
                       <artifactId>beam-runners-direct-java</artifactId>
                       <version>${beam.version}</version>
                       <scope>runtime</scope>
                   </dependency>
               </dependencies>
           </profile>
           <profile>
               <id>flink-runner</id>
               <!-- Makes the FlinkRunner available when running a pipeline. -->
               <dependencies>
                   <dependency>
                       <groupId>org.apache.beam</groupId>
                       <!-- Please see the Flink Runner page for an up-to-date 
list
                           of supported Flink versions and their artifact names:
                       https://beam.apache.org/documentation/runners/flink/ -->
                       <artifactId>${flink.artifact.name}</artifactId>
                       <version>${beam.version}</version>
                       <scope>runtime</scope>
                   </dependency>
               </dependencies>
           </profile>
       </profiles>
   
       <build>
           <plugins>
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-resources-plugin</artifactId>
                   <version>3.0.2</version>
                   <executions>
                       <execution>
                           <id>template-videocatalog-dockerfile</id>
                           <phase>validate</phase>
                           <goals>
                               <goal>copy-resources</goal>
                           </goals>
                           <configuration>
                               
<outputDirectory>deploy/videocatalog</outputDirectory>
                               <resources>
                                   <resource>
                                       
<directory>deploy/videocatalog/template</directory>
                                       <filtering>true</filtering>
                                   </resource>
                               </resources>
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
               <plugin>
                   <artifactId>maven-compiler-plugin</artifactId>
                   <version>${maven-compiler-plugin.version}</version>
                   <configuration>
                       <source>${jdk.version}</source>
                       <target>${jdk.version}</target>
                   </configuration>
               </plugin>
   
   
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-jar-plugin</artifactId>
                   <version>3.0.2</version>
               </plugin>
   
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-shade-plugin</artifactId>
                   <version>3.0.0</version>
                   <executions>
                       <execution>
                           <phase>package</phase>
                           <goals>
                               <goal>shade</goal>
                           </goals>
                           <configuration>
                               <filters>
                                   <filter>
                                       <artifact>*:*</artifact>
                                       <excludes>
                                           <exclude>META-INF/LICENSE</exclude>
                                           <exclude>META-INF/*.SF</exclude>
                                           <exclude>META-INF/*.DSA</exclude>
                                           <exclude>META-INF/*.RSA</exclude>
                                       </excludes>
                                   </filter>
                               </filters>
                               <transformers>
                                   <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
                               </transformers>
                               
<shadedArtifactAttached>true</shadedArtifactAttached>
                               
<shadedClassifierName>bundled</shadedClassifierName>
                           </configuration>
                       </execution>
                   </executions>
               </plugin>
   
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-surefire-plugin</artifactId>
                   <version>2.18.1</version>
                   <configuration>
                       <parallel>all</parallel>
                       <threadCount>4</threadCount>
                       <redirectTestOutputToFile>true</redirectTestOutputToFile>
                   </configuration>
                   <dependencies>
                       <dependency>
                           <groupId>org.apache.maven.surefire</groupId>
                           <artifactId>surefire-junit47</artifactId>
                           <version>2.18.1</version>
                       </dependency>
                   </dependencies>
               </plugin>
   
               <plugin>
                   <groupId>org.apache.maven.plugins</groupId>
                   <artifactId>maven-deploy-plugin</artifactId>
                   <version>2.8.1</version>
                   <executions>
                       <execution>
                           <id>default-deploy</id>
                           <phase>deploy</phase>
                           <goals>
                               <goal>deploy</goal>
                           </goals>
                       </execution>
                   </executions>
               </plugin>
           </plugins>
       </build>
   
   </project>
   
   ```
   
   And the associated configuration : 
   
   ```yaml
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     annotations:
       meta.helm.sh/release-name: data-octopus.flink-pipelines
       meta.helm.sh/release-namespace: flink-pipelines
       rollme: Uv1n7
     creationTimestamp: "2024-01-01T15:15:31Z"
     finalizers:
     - flinkdeployments.flink.apache.org/finalizer
     generation: 5
     labels:
       app.kubernetes.io/managed-by: Helm
       environment: staging
     name: vc-realtime-ix7-staging
     namespace: flink-pipelines
     resourceVersion: "2211827076"
     uid: 328c9b41-ce8a-4165-9c9e-80141c9eb16d
   spec:
     flinkConfiguration:
       env.java.opts: -Dlog4j2.formatMsgNoLookups=true
       high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
       high-availability.storageDir: s3a://flink/ha/videocatalog-realtime
       kubernetes.jobmanager.entrypoint.args: 
-Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
         -Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
       kubernetes.taskmanager.entrypoint.args: 
-Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
         -Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
       metrics.reporter.dghttp.class: 
org.apache.flink.metrics.datadog.DatadogHttpReporter
       metrics.reporter.dghttp.tags: app:videocatalog-realtime,env:staging
       metrics.scope.jm: flink.jm
       metrics.scope.jm.job: flink.jm.job
       metrics.scope.operator: flink.operator
       metrics.scope.task: flink.task
       metrics.scope.tm: flink.tm
       metrics.scope.tm.job: flink.tm.job
       s3.endpoint: http://flink-minio-svc:9000
       s3.path-style-access: "true"
       state.backend.type: hashmap
       state.checkpoint-storage: filesystem
       state.checkpoints.dir: 
s3a://flink/recommender/videocatalog/externalized-checkpoints
       state.savepoints.dir: s3a://flink/recommender/videocatalog/savepoints
       taskmanager.numberOfTaskSlots: "4"
       web.timeout: "100000"
       web.upload.dir: /opt/flink
     flinkVersion: v1_16
     image: quay.io/mycompany/data-octopus-flink:a64370c
     imagePullPolicy: Always
     ingress:
       annotations:
         external-dns.alpha.kubernetes.io/hostname: 
vc-realtime-ix7-staging.mydomain.com
         external-dns.alpha.kubernetes.io/target: ****
         external-dns.alpha.kubernetes.io/ttl: "120"
         nginx.ingress.kubernetes.io/whitelist-source-range: *****
       className: nginx-priv
       template: vc-realtime-ix7-staging.mydomain.com
     job:
       args:
       - 
--oplogPubsubSubscription=projects/my-company/subscriptions/oplog-low-latency.aerospike
       - --runner=FlinkRunner
       - --streaming=true
       - --attachedMode=false
       - --checkpointingInterval=60000
       - --latencyTrackingInterval=60000
       entryClass: com.mycompany.reco_videocatalog.VideoCatalogWriteJob
       jarURI: local:///opt/flink/flink-web-upload/data-octopus-bundled.jar
       parallelism: 8
       state: running
       upgradeMode: savepoint
     jobManager:
       replicas: 1
       resource:
         cpu: 0.5
         memory: 2g
     podTemplate:
       apiVersion: v1
       kind: Pod
       spec:
         containers:
         - env:
           - name: DATADOG_API_KEY
             valueFrom:
               secretKeyRef:
                 key: datadogApiKey
                 name: data-octopus
           - name: GOOGLE_APPLICATION_CREDENTIALS
             value: /var/secrets/google/google-credentials
           - name: MINIO_ACCESS_KEY
             valueFrom:
               secretKeyRef:
                 key: minioAccessKey
                 name: data-octopus
           - name: MINIO_SECRET_KEY
             valueFrom:
               secretKeyRef:
                 key: minioSecretKey
                 name: data-octopus
           name: flink-main-container
           volumeMounts:
           - mountPath: /var/secrets/google
             name: gcp-serviceaccount
           - mountPath: /var/secrets/certs
             name: mycompany-ca-cert
         imagePullSecrets:
         - name: mycompany-puller-pull-secret
         volumes:
         - name: gcp-serviceaccount
           secret:
             items:
             - key: google-credentials
               path: google-credentials
             secretName: data-octopus
         - name: mycompany-ca-cert
           secret:
             items:
             - key: ca.crt
               path: ca.crt
             secretName: mycompany-ca-cert
     serviceAccount: flink
     taskManager:
       resource:
   
         cpu: 2
         memory: 4g
   ```
   
   
   Here is the screenshot from Google Cloud Console showing that messages are 
never acked : 
   
   ![Screenshot 2024-01-03 at 09 41 
22](https://github.com/apache/beam/assets/3447482/d1084d9b-6a19-45e0-84ef-230e64b8a3e6)
   
   Thank you for your help :) 
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [X] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to