This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/main by this push:
     new ae3dc18080 Upgrade beam version, fixes #6288 (#6289)
ae3dc18080 is described below

commit ae3dc180803a7b28b910df26e8bd01754392901d
Author: Hans Van Akelyen <[email protected]>
AuthorDate: Mon Jan 5 11:04:20 2026 +0100

    Upgrade beam version, fixes #6288 (#6289)
    
    * Upgrade beam version, fixes #6288
    
    * Add script to find duplicate jars in hop folder
    
    * update docs
    
    * fix log4j errors
    
    * cleanup some duplicate jars
    
    fix pom
---
 .../pipeline/beam/getting-started-with-beam.adoc   |  10 ++
 lib/pom.xml                                        |  13 +-
 plugins/engines/beam/pom.xml                       | 184 +++++++++++----------
 plugins/engines/beam/src/assembly/assembly.xml     |   1 +
 .../hop/beam/engines/BeamPipelineEngine.java       |   2 +
 .../transforms/kinesis/BeamKinesisConsumeMeta.java |   3 +-
 .../kinesis/BeamKinesisConsumeTransform.java       |  31 +++-
 .../transforms/kinesis/BeamKinesisProduceMeta.java |   3 +-
 .../kinesis/BeamKinesisProduceTransform.java       |  97 ++++++++---
 .../kinesis/KinesisRecordToHopRowFn.java           |   2 +-
 plugins/tech/neo4j/pom.xml                         |  12 ++
 plugins/tech/neo4j/src/assembly/assembly.xml       |   2 +
 pom.xml                                            |   2 +-
 tools/find_duplicate_jars.sh                       | 131 +++++++++++++++
 14 files changed, 357 insertions(+), 136 deletions(-)

diff --git 
a/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
 
b/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
index 80a25ee938..6ace00a5c5 100644
--- 
a/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
+++ 
b/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
@@ -52,6 +52,16 @@ Here is the documentation for the relevant plugins:
 |===
 |Hop version |Beam version |Spark version |Flink version
 
+|2.17.0
+|2.70.0
+|3.5.X (scala 2.12)
+|1.19.x
+
+|2.16.0
+|2.62.0
+|3.4.X (scala 2.12)
+|1.17.x
+
 |2.9.0
 |2.56.0
 |3.4.X (scala 2.12)
diff --git a/lib/pom.xml b/lib/pom.xml
index 08c5024bb7..c285320a19 100644
--- a/lib/pom.xml
+++ b/lib/pom.xml
@@ -77,12 +77,12 @@
         <google-api-common.version>2.13.0</google-api-common.version>
         
<google-api-services-bigquery.version>v2-rev20240229-2.0.0</google-api-services-bigquery.version>
         
<google-api-services-storage.version>v1-rev20240311-2.0.0</google-api-services-storage.version>
-        
<google-auth-library-credentials.version>1.30.0</google-auth-library-credentials.version>
-        
<google-auth-library-oauth2-http.version>1.30.0</google-auth-library-oauth2-http.version>
+        
<google-auth-library-credentials.version>1.40.0</google-auth-library-credentials.version>
+        
<google-auth-library-oauth2-http.version>1.40.0</google-auth-library-oauth2-http.version>
         <google-cloud-storage.version>2.15.0</google-cloud-storage.version>
         <google-http-client.version>1.42.3</google-http-client.version>
         <google-oauth-client.version>1.34.1</google-oauth-client.version>
-        <gson.version>2.11.0</gson.version>
+        <gson.version>2.12.1</gson.version>
         <!--check Beam BOM to find matching version 
https://repo1.maven.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/xxx/beam-runners-google-cloud-dataflow-java-xxx.pom
         Replace exclusions beam-vendor-guava-xxx-jre-->
         <guava.version>33.3.1-jre</guava.version>
@@ -111,7 +111,7 @@
         
<kafka-schema-registry-client.version>7.6.1</kafka-schema-registry-client.version>
         <kotlin.version>1.9.25</kotlin.version>
         <kryo.version>5.3.0</kryo.version>
-        <log4j.version>2.23.1</log4j.version>
+        <log4j.version>2.25.3</log4j.version>
         <lz4.version>1.8.0</lz4.version>
         <metrics.version>4.2.12</metrics.version>
         <minlog.version>1.3.1</minlog.version>
@@ -230,11 +230,6 @@
                 <artifactId>aws-java-sdk-redshift</artifactId>
                 <version>${aws-java-sdk.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.amazonaws</groupId>
-                <artifactId>aws-java-sdk-s3</artifactId>
-                <version>${aws-java-sdk.version}</version>
-            </dependency>
             <dependency>
                 <groupId>com.databricks</groupId>
                 <artifactId>spark-avro_2.11</artifactId>
diff --git a/plugins/engines/beam/pom.xml b/plugins/engines/beam/pom.xml
index e91120fb08..98fbbf18e8 100644
--- a/plugins/engines/beam/pom.xml
+++ b/plugins/engines/beam/pom.xml
@@ -321,7 +321,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
@@ -415,7 +415,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
@@ -521,7 +521,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
@@ -583,7 +583,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
@@ -593,22 +593,104 @@
                     <groupId>org.apache.commons</groupId>
                     <artifactId>commons-lang3</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>org.checkerframework</groupId>
                     <artifactId>checker-qual</artifactId>
                 </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+            <exclusions>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
+                    <groupId>*</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.beam</groupId>
-            <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+            <artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
             <exclusions>
                 <exclusion>
-                    <groupId>*</groupId>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-codec</groupId>
+                    <artifactId>commons-codec</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-lang</groupId>
+                    <artifactId>commons-lang</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>joda-time</groupId>
+                    <artifactId>joda-time</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.bytebuddy</groupId>
+                    <artifactId>byte-buddy</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.beam</groupId>
+                    <artifactId>beam-sdks-java-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.beam</groupId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.beam</groupId>
+                    <artifactId>beam-vendor-guava-33.1.0-jre-jre</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-lang3</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.checkerframework</groupId>
+                    <artifactId>checker-qual</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.conscrypt</groupId>
+                    <artifactId>conscrypt-openjdk-uber</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
             </exclusions>
@@ -809,7 +891,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
@@ -953,7 +1035,7 @@
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+                    <artifactId>beam-vendor-grpc-1_69_0</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>org.apache.beam</groupId>
@@ -977,68 +1059,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.beam</groupId>
-            <artifactId>beam-sdks-java-io-kinesis</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-codec</groupId>
-                    <artifactId>commons-codec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-io</groupId>
-                    <artifactId>commons-io</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-lang</groupId>
-                    <artifactId>commons-lang</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>joda-time</groupId>
-                    <artifactId>joda-time</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-sdks-java-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.beam</groupId>
-                    <artifactId>beam-vendor-guava-33.1.0-jre-jre</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-lang3</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.checkerframework</groupId>
-                    <artifactId>checker-qual</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-sdks-java-io-synthetic</artifactId>
@@ -1154,10 +1174,6 @@
                     <groupId>jakarta.servlet</groupId>
                     <artifactId>jakarta.servlet-api</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>net.minidev</groupId>
                     <artifactId>json-smart</artifactId>
@@ -1206,6 +1222,10 @@
                     <groupId>org.apache.httpcomponents</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
                     <artifactId>zookeeper</artifactId>
@@ -1309,10 +1329,6 @@
                     <groupId>jakarta.servlet</groupId>
                     <artifactId>jakarta.servlet-api</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
                 <exclusion>
                     <groupId>net.minidev</groupId>
                     <artifactId>json-smart</artifactId>
@@ -1361,6 +1377,10 @@
                     <groupId>org.apache.httpcomponents</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
                 <exclusion>
                     <groupId>org.apache.zookeeper</groupId>
                     <artifactId>zookeeper</artifactId>
@@ -1379,12 +1399,6 @@
             <groupId>org.apache.beam</groupId>
             <artifactId>beam-sdks-java-core</artifactId>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.xerial.snappy</groupId>
-                    <artifactId>snappy-java</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.hop</groupId>
diff --git a/plugins/engines/beam/src/assembly/assembly.xml 
b/plugins/engines/beam/src/assembly/assembly.xml
index c3cbc46668..9489e8d482 100644
--- a/plugins/engines/beam/src/assembly/assembly.xml
+++ b/plugins/engines/beam/src/assembly/assembly.xml
@@ -153,6 +153,7 @@
                 <exclude>io.confluent::jar</exclude>
                 <exclude>com.google.code.gson:gson:jar</exclude>
                 <exclude>org.apache.commons:commons-collections4:jar</exclude>
+                <exclude>org.jspecify:jspecify:jar</exclude>
             </excludes>
             <outputDirectory>lib/beam</outputDirectory>
         </dependencySet>
diff --git 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
index 59bf473450..6f48450192 100644
--- 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
+++ 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
@@ -98,6 +98,8 @@ public abstract class BeamPipelineEngine extends Variables
 
   static MetricResults EMPTY_METRIC_RESULTS =
       new DefaultMetricResults(
+          Collections.emptyList(),
+          Collections.emptyList(),
           Collections.emptyList(),
           Collections.emptyList(),
           Collections.emptyList(),
diff --git 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
index 2da4d7e6c7..95281325ae 100644
--- 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
+++ 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
@@ -18,7 +18,6 @@
 
 package org.apache.hop.beam.transforms.kinesis;
 
-import com.amazonaws.regions.Regions;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.values.PCollection;
@@ -256,7 +255,7 @@ public class BeamKinesisConsumeMeta extends 
BaseTransformMeta<BeamKinesisConsume
             transformMeta.getName(),
             variables.resolve(accessKey),
             variables.resolve(secretKey),
-            Regions.DEFAULT_REGION, // TODO : make configurable
+            "us-east-1", // TODO : make configurable
             JsonRowMeta.toJson(outputRowMeta),
             variables.resolve(streamName),
             variables.resolve(uniqueIdField),
diff --git 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
index 07c2caeb6e..a55881cc43 100644
--- 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
+++ 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
@@ -17,11 +17,10 @@
 
 package org.apache.hop.beam.transforms.kinesis;
 
-import com.amazonaws.regions.Regions;
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.io.kinesis.KinesisIO;
-import org.apache.beam.sdk.io.kinesis.KinesisRecord;
+import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -39,6 +38,10 @@ import org.apache.hop.pipeline.Pipeline;
 import org.joda.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
 
 public class BeamKinesisConsumeTransform extends PTransform<PBegin, 
PCollection<HopRow>> {
 
@@ -49,7 +52,7 @@ public class BeamKinesisConsumeTransform extends 
PTransform<PBegin, PCollection<
 
   private String accessKey;
   private String secretKey;
-  private Regions regions;
+  private String region;
   private String streamName;
   private String uniqueIdField;
   private String dataField;
@@ -80,7 +83,7 @@ public class BeamKinesisConsumeTransform extends 
PTransform<PBegin, PCollection<
       String transformName,
       String accessKey,
       String secretKey,
-      Regions regions,
+      String region,
       String rowMetaJson,
       String streamName,
       String uniqueIdField,
@@ -107,7 +110,7 @@ public class BeamKinesisConsumeTransform extends 
PTransform<PBegin, PCollection<
     this.streamName = streamName;
     this.accessKey = accessKey;
     this.secretKey = secretKey;
-    this.regions = regions;
+    this.region = region;
     this.uniqueIdField = uniqueIdField;
     this.dataField = dataField;
     this.dataType = dataType;
@@ -146,9 +149,19 @@ public class BeamKinesisConsumeTransform extends 
PTransform<PBegin, PCollection<
 
       PCollection<HopRow> output;
 
-      KinesisIO.Read<KinesisRecord> kinesisRecordRead =
+      StaticCredentialsProvider credentialsProvider =
+          
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, 
secretKey));
+      Region awsRegion = Region.of(region);
+
+      ClientConfiguration clientConfig =
+          ClientConfiguration.builder()
+              .credentialsProvider(credentialsProvider)
+              .region(awsRegion)
+              .build();
+
+      KinesisIO.Read kinesisRecordRead =
           KinesisIO.read()
-              .withAWSClientsProvider(accessKey, secretKey, regions)
+              .withClientConfiguration(clientConfig)
               .withStreamName(streamName)
               .withInitialPositionInStream(InitialPositionInStream.LATEST) // 
TODO make configurable
           ;
diff --git 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
index f063f5028b..f78db90f69 100644
--- 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
+++ 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
@@ -17,7 +17,6 @@
 
 package org.apache.hop.beam.transforms.kinesis;
 
-import com.amazonaws.regions.Regions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -146,7 +145,7 @@ public class BeamKinesisProduceMeta extends 
BaseTransformMeta<BeamKinesisProduce
             JsonRowMeta.toJson(rowMeta),
             variables.resolve(accessKey),
             variables.resolve(secretKey),
-            Regions.DEFAULT_REGION,
+            "us-east-1",
             variables.resolve(streamName),
             dataFieldName,
             variables.resolve(dataType),
diff --git 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
index b038987dce..9cca6b8901 100644
--- 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
+++ 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
@@ -17,18 +17,19 @@
 
 package org.apache.hop.beam.transforms.kinesis;
 
-import com.amazonaws.regions.Regions;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
-import org.apache.beam.sdk.io.kinesis.KinesisIO;
+import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hop.beam.core.BeamHop;
 import org.apache.hop.beam.core.HopRow;
 import org.apache.hop.core.exception.HopException;
@@ -38,6 +39,9 @@ import org.apache.hop.core.row.JsonRowMeta;
 import org.apache.hop.pipeline.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
 
 public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>, PDone> {
 
@@ -46,7 +50,7 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
 
   private String accessKey;
   private String secretKey;
-  private Regions regions;
+  private String region;
   private String streamName;
   private String dataField;
   private String dataType;
@@ -66,7 +70,7 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
       String rowMetaJson,
       String accessKey,
       String secretKey,
-      Regions regions,
+      String region,
       String streamName,
       String dataField,
       String dataType,
@@ -80,7 +84,7 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
     this.rowMetaJson = rowMetaJson;
     this.accessKey = accessKey;
     this.secretKey = secretKey;
-    this.regions = regions;
+    this.region = region;
     this.streamName = streamName;
     this.dataField = dataField;
     this.dataType = dataType;
@@ -114,27 +118,49 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
         throw new HopException("For now, only Strings are supported as Kinesis 
data messages");
       }
 
-      // Add custom configuration options to this map:
-      Properties producerProperties = new Properties();
-      for (KinesisConfigOption configOption : configOptions) {
-        producerProperties.put(configOption.getParameter(), 
configOption.getValue());
-      }
+      // Note: Producer properties are not directly supported in the new aws2 
API
+      // Custom configuration options would need to be set through 
ClientConfiguration
+      // if needed in the future
 
-      // Convert to PCollection of KV<String, byte[]>
+      // Convert to PCollection of KV<String, byte[]> where key is partition 
key
       //
-      PCollection<byte[]> messages =
-          input.apply(ParDo.of(new HopRowToMessage(transformName, rowMetaJson, 
messageIndex)));
+      int partitionKeyIndex = -1;
+      if (StringUtils.isNotEmpty(partitionKey)) {
+        partitionKeyIndex = rowMeta.indexOfValue(partitionKey);
+        if (partitionKeyIndex < 0) {
+          throw new HopException(
+              "Unable to find partition key field "
+                  + partitionKey
+                  + " in input row: "
+                  + rowMeta.toString());
+        }
+      }
+
+      PCollection<KV<String, byte[]>> messages =
+          input.apply(
+              ParDo.of(
+                  new HopRowToKVMessage(
+                      transformName, rowMetaJson, messageIndex, 
partitionKeyIndex)));
 
-      // Write to Kinesis stream with <String, byte[]>
+      // Write to Kinesis stream with KV<String, byte[]>
       //
-      KinesisIO.Write write =
-          KinesisIO.write()
-              .withAWSClientsProvider(accessKey, secretKey, regions)
-              .withStreamName(streamName)
-              .withPartitionKey(partitionKey)
-              .withProducerProperties(producerProperties);
-
-      return messages.apply(write);
+      StaticCredentialsProvider credentialsProvider =
+          
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, 
secretKey));
+      Region awsRegion = Region.of(region);
+
+      ClientConfiguration clientConfig =
+          ClientConfiguration.builder()
+              .credentialsProvider(credentialsProvider)
+              .region(awsRegion)
+              .build();
+
+      KinesisIO.Write<KV<String, byte[]>> write =
+          KinesisIO.<KV<String, byte[]>>write()
+              .withClientConfiguration(clientConfig)
+              .withStreamName(streamName);
+
+      messages.apply(write);
+      return PDone.in(messages.getPipeline());
     } catch (Exception e) {
       numErrors.inc();
       LOG.error("Error in Beam Kinesis Produce transform", e);
@@ -142,21 +168,25 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
     }
   }
 
-  // Simply convert HopRow to byte[]
+  // Convert HopRow to KV<String, byte[]> where key is partition key
   //
-  private static class HopRowToMessage extends DoFn<HopRow, byte[]> {
+  private static class HopRowToKVMessage extends DoFn<HopRow, KV<String, 
byte[]>> {
     private final int messageIndex;
+    private final int partitionKeyIndex;
     private final String transformName;
     private final String rowMetaJson;
 
     private transient IValueMeta valueMeta;
+    private transient IValueMeta partitionKeyValueMeta;
     private transient Counter outputCounter;
     private transient Counter readCounter;
 
-    public HopRowToMessage(String transformName, String rowMetaJson, int 
messageIndex) {
+    public HopRowToKVMessage(
+        String transformName, String rowMetaJson, int messageIndex, int 
partitionKeyIndex) {
       this.transformName = transformName;
       this.rowMetaJson = rowMetaJson;
       this.messageIndex = messageIndex;
+      this.partitionKeyIndex = partitionKeyIndex;
     }
 
     @Setup
@@ -170,6 +200,9 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
         BeamHop.init();
         IRowMeta rowMeta = JsonRowMeta.fromJson(rowMetaJson);
         valueMeta = rowMeta.getValueMeta(messageIndex);
+        if (partitionKeyIndex >= 0) {
+          partitionKeyValueMeta = rowMeta.getValueMeta(partitionKeyIndex);
+        }
 
         Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName).inc();
       } catch (Exception e) {
@@ -190,7 +223,17 @@ public class BeamKinesisProduceTransform extends 
PTransform<PCollection<HopRow>,
       //
       try {
         byte[] message = valueMeta.getBinary(hopRow.getRow()[messageIndex]);
-        processContext.output(message);
+
+        // Extract partition key
+        String partitionKeyValue = "";
+        if (partitionKeyIndex >= 0) {
+          partitionKeyValue = 
partitionKeyValueMeta.getString(hopRow.getRow()[partitionKeyIndex]);
+          if (partitionKeyValue == null) {
+            partitionKeyValue = "";
+          }
+        }
+
+        processContext.output(KV.of(partitionKeyValue, message));
         outputCounter.inc();
       } catch (Exception e) {
         throw new RuntimeException(
diff --git 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
index 4e36bce3ed..ea30fd8ff6 100644
--- 
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
+++ 
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
@@ -19,7 +19,7 @@
 package org.apache.hop.beam.transforms.kinesis;
 
 import java.nio.charset.StandardCharsets;
-import org.apache.beam.sdk.io.kinesis.KinesisRecord;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.transforms.DoFn;
diff --git a/plugins/tech/neo4j/pom.xml b/plugins/tech/neo4j/pom.xml
index c5d676b5f6..f052a53c1b 100755
--- a/plugins/tech/neo4j/pom.xml
+++ b/plugins/tech/neo4j/pom.xml
@@ -32,6 +32,18 @@
         <neo4j.driver.version>5.15.0</neo4j.driver.version>
     </properties>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.hop</groupId>
+                <artifactId>hop-libs</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <dependencies>
         <dependency>
             <groupId>org.neo4j.driver</groupId>
diff --git a/plugins/tech/neo4j/src/assembly/assembly.xml 
b/plugins/tech/neo4j/src/assembly/assembly.xml
index 86c679e05c..55aa8ff693 100644
--- a/plugins/tech/neo4j/src/assembly/assembly.xml
+++ b/plugins/tech/neo4j/src/assembly/assembly.xml
@@ -51,6 +51,7 @@
             <excludes>
                 <exclude>org.apache.hop:hop-tech-neo4j:jar</exclude>
                 <exclude>org.reactivestreams:reactive-streams:jar</exclude>
+                <exclude>io.netty::jar</exclude>
             </excludes>
             <outputDirectory>plugins/tech/neo4j/lib</outputDirectory>
         </dependencySet>
@@ -58,6 +59,7 @@
             <scope>runtime</scope>
             <includes>
                 <include>org.reactivestreams:reactive-streams:jar</include>
+                <include>io.netty::jar</include>
             </includes>
             <outputDirectory>lib/core</outputDirectory>
         </dependencySet>
diff --git a/pom.xml b/pom.xml
index 7b38167235..525f4ac4ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,7 +97,7 @@
     </distributionManagement>
 
     <properties>
-        <apache-beam.version>2.62.0</apache-beam.version>
+        <apache-beam.version>2.70.0</apache-beam.version>
         <assembly_appendId>false</assembly_appendId>
         <assembly_package-phase>package</assembly_package-phase>
         <attach-sources-phase>verify</attach-sources-phase>
diff --git a/tools/find_duplicate_jars.sh b/tools/find_duplicate_jars.sh
new file mode 100755
index 0000000000..ad43fbe3fe
--- /dev/null
+++ b/tools/find_duplicate_jars.sh
@@ -0,0 +1,131 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SEARCH_DIR="assemblies/client/target/hop"
+
+# Check if the search directory exists
+if [ ! -d "$SEARCH_DIR" ]; then
+    echo "Error: Directory '$SEARCH_DIR' does not exist" >&2
+    exit 1
+fi
+
+find "$SEARCH_DIR" -type f -name "*.jar" | awk -F/ '
+    function get_size(path,   cmd, s) {
+      cmd = "stat -f%z \"" path "\""
+      cmd | getline s
+      close(cmd)
+      return s+0
+    }
+
+    function human(n) {
+      if (n < 1024) return n " B"
+      else if (n < 1048576) return sprintf("%.1f KB", n/1024)
+      else if (n < 1073741824) return sprintf("%.1f MB", n/1048576)
+      else return sprintf("%.1f GB", n/1073741824)
+    }
+
+    function repeat(c, n,   s,i) {
+      s=""
+      for(i=1;i<=n;i++) s=s c
+      return s
+    }
+
+    {
+      full = $0
+      file = $NF
+      base = file
+      sub(/-[0-9][0-9A-Za-z._-]*\.jar$/, "", base)
+
+      size = get_size(full)
+
+      count[base]++
+      total_size[base] += size
+      if (size > max_size[base]) max_size[base] = size
+      if (!(base in example_file)) {
+        example_file[base] = file
+        example_size[base] = size
+      }
+    }
+
+    END {
+      # compute max widths
+      max_count = 5
+      max_name = length("NAME")
+      max_example = length("EXAMPLE")
+      max_size_w = length("SIZE")
+      max_total = length("TOTAL")
+      max_saved = length("SAVED")
+
+      for (b in count) {
+        if (count[b] > 1) {
+          ex = example_file[b]
+          max_name = (length(b) > max_name ? length(b) : max_name)
+          max_example = (length(ex) > max_example ? length(ex) : max_example)
+          s_ex = human(example_size[b])
+          s_tot = human(total_size[b])
+          s_saved = human(total_size[b] - max_size[b])
+          max_size_w = (length(s_ex) > max_size_w ? length(s_ex) : max_size_w)
+          max_total = (length(s_tot) > max_total ? length(s_tot) : max_total)
+          max_saved = (length(s_saved) > max_saved ? length(s_saved) : 
max_saved)
+          saved_map[b] = total_size[b] - max_size[b]
+        }
+      }
+
+      # sort by saved descending (bubble sort for macOS awk)
+      n = 0
+      for (b in saved_map) {
+        n++
+        sorted[n] = b
+      }
+      for(i=1;i<=n;i++)
+        for(j=i+1;j<=n;j++)
+          if(saved_map[sorted[i]] < saved_map[sorted[j]]) {
+            t=sorted[i]; sorted[i]=sorted[j]; sorted[j]=t
+          }
+
+      # prepare format string
+      fmt = "%" max_count "s %-" max_name "s %-" max_example "s %" max_size_w 
"s %" max_total "s %" max_saved "s\n"
+
+      # print header
+      printf fmt, "COUNT", "NAME", "EXAMPLE", "SIZE", "TOTAL", "SAVED"
+
+      # separator
+      total_width = max_count + 1 + max_name + 1 + max_example + 1 + 
max_size_w + 1 + max_total + 1 + max_saved
+      printf "%s\n", repeat("-", total_width)
+
+      # print data rows
+      sum_saved=0
+      for(i=1;i<=n;i++){
+        b = sorted[i]
+        cnt = count[b]
+        ex_file = example_file[b]
+        ex_size = human(example_size[b])
+        tot = human(total_size[b])
+        saved = human(saved_map[b])
+        sum_saved += saved_map[b]
+
+        printf fmt, cnt, b, ex_file, ex_size, tot, saved
+      }
+
+      # footer
+      printf "%s\n", repeat("-", total_width)
+      printf "Total potential savings: %s\n", human(sum_saved)
+    }
+  '
+


Reply via email to