This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new ae3dc18080 Upgrade beam version, fixes #6288 (#6289)
ae3dc18080 is described below
commit ae3dc180803a7b28b910df26e8bd01754392901d
Author: Hans Van Akelyen <[email protected]>
AuthorDate: Mon Jan 5 11:04:20 2026 +0100
Upgrade beam version, fixes #6288 (#6289)
* Upgrade beam version, fixes #6288
* Add script to find duplicate jars in hop folder
* update docs
* fix log4j errors
* cleanup some duplicate jars
fix pom
---
.../pipeline/beam/getting-started-with-beam.adoc | 10 ++
lib/pom.xml | 13 +-
plugins/engines/beam/pom.xml | 184 +++++++++++----------
plugins/engines/beam/src/assembly/assembly.xml | 1 +
.../hop/beam/engines/BeamPipelineEngine.java | 2 +
.../transforms/kinesis/BeamKinesisConsumeMeta.java | 3 +-
.../kinesis/BeamKinesisConsumeTransform.java | 31 +++-
.../transforms/kinesis/BeamKinesisProduceMeta.java | 3 +-
.../kinesis/BeamKinesisProduceTransform.java | 97 ++++++++---
.../kinesis/KinesisRecordToHopRowFn.java | 2 +-
plugins/tech/neo4j/pom.xml | 12 ++
plugins/tech/neo4j/src/assembly/assembly.xml | 2 +
pom.xml | 2 +-
tools/find_duplicate_jars.sh | 131 +++++++++++++++
14 files changed, 357 insertions(+), 136 deletions(-)
diff --git
a/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
b/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
index 80a25ee938..6ace00a5c5 100644
---
a/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
+++
b/docs/hop-user-manual/modules/ROOT/pages/pipeline/beam/getting-started-with-beam.adoc
@@ -52,6 +52,16 @@ Here is the documentation for the relevant plugins:
|===
|Hop version |Beam version |Spark version |Flink version
+|2.17.0
+|2.70.0
+|3.5.X (scala 2.12)
+|1.19.x
+
+|2.16.0
+|2.62.0
+|3.4.X (scala 2.12)
+|1.17.x
+
|2.9.0
|2.56.0
|3.4.X (scala 2.12)
diff --git a/lib/pom.xml b/lib/pom.xml
index 08c5024bb7..c285320a19 100644
--- a/lib/pom.xml
+++ b/lib/pom.xml
@@ -77,12 +77,12 @@
<google-api-common.version>2.13.0</google-api-common.version>
<google-api-services-bigquery.version>v2-rev20240229-2.0.0</google-api-services-bigquery.version>
<google-api-services-storage.version>v1-rev20240311-2.0.0</google-api-services-storage.version>
-
<google-auth-library-credentials.version>1.30.0</google-auth-library-credentials.version>
-
<google-auth-library-oauth2-http.version>1.30.0</google-auth-library-oauth2-http.version>
+
<google-auth-library-credentials.version>1.40.0</google-auth-library-credentials.version>
+
<google-auth-library-oauth2-http.version>1.40.0</google-auth-library-oauth2-http.version>
<google-cloud-storage.version>2.15.0</google-cloud-storage.version>
<google-http-client.version>1.42.3</google-http-client.version>
<google-oauth-client.version>1.34.1</google-oauth-client.version>
- <gson.version>2.11.0</gson.version>
+ <gson.version>2.12.1</gson.version>
<!--check Beam BOM to find matching version
https://repo1.maven.org/maven2/org/apache/beam/beam-runners-google-cloud-dataflow-java/xxx/beam-runners-google-cloud-dataflow-java-xxx.pom
Replace exclusions beam-vendor-guava-xxx-jre-->
<guava.version>33.3.1-jre</guava.version>
@@ -111,7 +111,7 @@
<kafka-schema-registry-client.version>7.6.1</kafka-schema-registry-client.version>
<kotlin.version>1.9.25</kotlin.version>
<kryo.version>5.3.0</kryo.version>
- <log4j.version>2.23.1</log4j.version>
+ <log4j.version>2.25.3</log4j.version>
<lz4.version>1.8.0</lz4.version>
<metrics.version>4.2.12</metrics.version>
<minlog.version>1.3.1</minlog.version>
@@ -230,11 +230,6 @@
<artifactId>aws-java-sdk-redshift</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
- <dependency>
- <groupId>com.amazonaws</groupId>
- <artifactId>aws-java-sdk-s3</artifactId>
- <version>${aws-java-sdk.version}</version>
- </dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
diff --git a/plugins/engines/beam/pom.xml b/plugins/engines/beam/pom.xml
index e91120fb08..98fbbf18e8 100644
--- a/plugins/engines/beam/pom.xml
+++ b/plugins/engines/beam/pom.xml
@@ -321,7 +321,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
@@ -415,7 +415,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
@@ -521,7 +521,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
@@ -583,7 +583,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
@@ -593,22 +593,104 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
</exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+ <exclusions>
<exclusion>
- <groupId>org.slf4j</groupId>
+ <groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-extensions-join-library</artifactId>
+ <artifactId>beam-sdks-java-io-amazon-web-services2</artifactId>
<exclusions>
<exclusion>
- <groupId>*</groupId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.bytebuddy</groupId>
+ <artifactId>byte-buddy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-sdks-java-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-vendor-guava-33.1.0-jre-jre</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.checkerframework</groupId>
+ <artifactId>checker-qual</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.conscrypt</groupId>
+ <artifactId>conscrypt-openjdk-uber</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
@@ -809,7 +891,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
@@ -953,7 +1035,7 @@
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-grpc-1_60_1</artifactId>
+ <artifactId>beam-vendor-grpc-1_69_0</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.beam</groupId>
@@ -977,68 +1059,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-io-kinesis</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-vendor-guava-33.1.0-jre-jre</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.checkerframework</groupId>
- <artifactId>checker-qual</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-synthetic</artifactId>
@@ -1154,10 +1174,6 @@
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
@@ -1206,6 +1222,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -1309,10 +1329,6 @@
<groupId>jakarta.servlet</groupId>
<artifactId>jakarta.servlet-api</artifactId>
</exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
@@ -1361,6 +1377,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>*</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
@@ -1379,12 +1399,6 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hop</groupId>
diff --git a/plugins/engines/beam/src/assembly/assembly.xml
b/plugins/engines/beam/src/assembly/assembly.xml
index c3cbc46668..9489e8d482 100644
--- a/plugins/engines/beam/src/assembly/assembly.xml
+++ b/plugins/engines/beam/src/assembly/assembly.xml
@@ -153,6 +153,7 @@
<exclude>io.confluent::jar</exclude>
<exclude>com.google.code.gson:gson:jar</exclude>
<exclude>org.apache.commons:commons-collections4:jar</exclude>
+ <exclude>org.jspecify:jspecify:jar</exclude>
</excludes>
<outputDirectory>lib/beam</outputDirectory>
</dependencySet>
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
index 59bf473450..6f48450192 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/BeamPipelineEngine.java
@@ -98,6 +98,8 @@ public abstract class BeamPipelineEngine extends Variables
static MetricResults EMPTY_METRIC_RESULTS =
new DefaultMetricResults(
+ Collections.emptyList(),
+ Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
index 2da4d7e6c7..95281325ae 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeMeta.java
@@ -18,7 +18,6 @@
package org.apache.hop.beam.transforms.kinesis;
-import com.amazonaws.regions.Regions;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.values.PCollection;
@@ -256,7 +255,7 @@ public class BeamKinesisConsumeMeta extends
BaseTransformMeta<BeamKinesisConsume
transformMeta.getName(),
variables.resolve(accessKey),
variables.resolve(secretKey),
- Regions.DEFAULT_REGION, // TODO : make configurable
+ "us-east-1", // TODO : make configurable
JsonRowMeta.toJson(outputRowMeta),
variables.resolve(streamName),
variables.resolve(uniqueIdField),
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
index 07c2caeb6e..a55881cc43 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisConsumeTransform.java
@@ -17,11 +17,10 @@
package org.apache.hop.beam.transforms.kinesis;
-import com.amazonaws.regions.Regions;
-import
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.avro.generic.GenericRecord;
-import org.apache.beam.sdk.io.kinesis.KinesisIO;
-import org.apache.beam.sdk.io.kinesis.KinesisRecord;
+import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
@@ -39,6 +38,10 @@ import org.apache.hop.pipeline.Pipeline;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.kinesis.common.InitialPositionInStream;
public class BeamKinesisConsumeTransform extends PTransform<PBegin,
PCollection<HopRow>> {
@@ -49,7 +52,7 @@ public class BeamKinesisConsumeTransform extends
PTransform<PBegin, PCollection<
private String accessKey;
private String secretKey;
- private Regions regions;
+ private String region;
private String streamName;
private String uniqueIdField;
private String dataField;
@@ -80,7 +83,7 @@ public class BeamKinesisConsumeTransform extends
PTransform<PBegin, PCollection<
String transformName,
String accessKey,
String secretKey,
- Regions regions,
+ String region,
String rowMetaJson,
String streamName,
String uniqueIdField,
@@ -107,7 +110,7 @@ public class BeamKinesisConsumeTransform extends
PTransform<PBegin, PCollection<
this.streamName = streamName;
this.accessKey = accessKey;
this.secretKey = secretKey;
- this.regions = regions;
+ this.region = region;
this.uniqueIdField = uniqueIdField;
this.dataField = dataField;
this.dataType = dataType;
@@ -146,9 +149,19 @@ public class BeamKinesisConsumeTransform extends
PTransform<PBegin, PCollection<
PCollection<HopRow> output;
- KinesisIO.Read<KinesisRecord> kinesisRecordRead =
+ StaticCredentialsProvider credentialsProvider =
+
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey,
secretKey));
+ Region awsRegion = Region.of(region);
+
+ ClientConfiguration clientConfig =
+ ClientConfiguration.builder()
+ .credentialsProvider(credentialsProvider)
+ .region(awsRegion)
+ .build();
+
+ KinesisIO.Read kinesisRecordRead =
KinesisIO.read()
- .withAWSClientsProvider(accessKey, secretKey, regions)
+ .withClientConfiguration(clientConfig)
.withStreamName(streamName)
.withInitialPositionInStream(InitialPositionInStream.LATEST) //
TODO make configurable
;
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
index f063f5028b..f78db90f69 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceMeta.java
@@ -17,7 +17,6 @@
package org.apache.hop.beam.transforms.kinesis;
-import com.amazonaws.regions.Regions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -146,7 +145,7 @@ public class BeamKinesisProduceMeta extends
BaseTransformMeta<BeamKinesisProduce
JsonRowMeta.toJson(rowMeta),
variables.resolve(accessKey),
variables.resolve(secretKey),
- Regions.DEFAULT_REGION,
+ "us-east-1",
variables.resolve(streamName),
dataFieldName,
variables.resolve(dataType),
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
index b038987dce..9cca6b8901 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/BeamKinesisProduceTransform.java
@@ -17,18 +17,19 @@
package org.apache.hop.beam.transforms.kinesis;
-import com.amazonaws.regions.Regions;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
-import org.apache.beam.sdk.io.kinesis.KinesisIO;
+import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.commons.lang.StringUtils;
import org.apache.hop.beam.core.BeamHop;
import org.apache.hop.beam.core.HopRow;
import org.apache.hop.core.exception.HopException;
@@ -38,6 +39,9 @@ import org.apache.hop.core.row.JsonRowMeta;
import org.apache.hop.pipeline.Pipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>, PDone> {
@@ -46,7 +50,7 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
private String accessKey;
private String secretKey;
- private Regions regions;
+ private String region;
private String streamName;
private String dataField;
private String dataType;
@@ -66,7 +70,7 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
String rowMetaJson,
String accessKey,
String secretKey,
- Regions regions,
+ String region,
String streamName,
String dataField,
String dataType,
@@ -80,7 +84,7 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
this.rowMetaJson = rowMetaJson;
this.accessKey = accessKey;
this.secretKey = secretKey;
- this.regions = regions;
+ this.region = region;
this.streamName = streamName;
this.dataField = dataField;
this.dataType = dataType;
@@ -114,27 +118,49 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
throw new HopException("For now, only Strings are supported as Kinesis
data messages");
}
- // Add custom configuration options to this map:
- Properties producerProperties = new Properties();
- for (KinesisConfigOption configOption : configOptions) {
- producerProperties.put(configOption.getParameter(),
configOption.getValue());
- }
+ // Note: Producer properties are not directly supported in the new aws2
API
+ // Custom configuration options would need to be set through
ClientConfiguration
+ // if needed in the future
- // Convert to PCollection of KV<String, byte[]>
+ // Convert to PCollection of KV<String, byte[]> where key is partition
key
//
- PCollection<byte[]> messages =
- input.apply(ParDo.of(new HopRowToMessage(transformName, rowMetaJson,
messageIndex)));
+ int partitionKeyIndex = -1;
+ if (StringUtils.isNotEmpty(partitionKey)) {
+ partitionKeyIndex = rowMeta.indexOfValue(partitionKey);
+ if (partitionKeyIndex < 0) {
+ throw new HopException(
+ "Unable to find partition key field "
+ + partitionKey
+ + " in input row: "
+ + rowMeta.toString());
+ }
+ }
+
+ PCollection<KV<String, byte[]>> messages =
+ input.apply(
+ ParDo.of(
+ new HopRowToKVMessage(
+ transformName, rowMetaJson, messageIndex,
partitionKeyIndex)));
- // Write to Kinesis stream with <String, byte[]>
+ // Write to Kinesis stream with KV<String, byte[]>
//
- KinesisIO.Write write =
- KinesisIO.write()
- .withAWSClientsProvider(accessKey, secretKey, regions)
- .withStreamName(streamName)
- .withPartitionKey(partitionKey)
- .withProducerProperties(producerProperties);
-
- return messages.apply(write);
+ StaticCredentialsProvider credentialsProvider =
+
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey,
secretKey));
+ Region awsRegion = Region.of(region);
+
+ ClientConfiguration clientConfig =
+ ClientConfiguration.builder()
+ .credentialsProvider(credentialsProvider)
+ .region(awsRegion)
+ .build();
+
+ KinesisIO.Write<KV<String, byte[]>> write =
+ KinesisIO.<KV<String, byte[]>>write()
+ .withClientConfiguration(clientConfig)
+ .withStreamName(streamName);
+
+ messages.apply(write);
+ return PDone.in(messages.getPipeline());
} catch (Exception e) {
numErrors.inc();
LOG.error("Error in Beam Kinesis Produce transform", e);
@@ -142,21 +168,25 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
}
}
- // Simply convert HopRow to byte[]
+ // Convert HopRow to KV<String, byte[]> where key is partition key
//
- private static class HopRowToMessage extends DoFn<HopRow, byte[]> {
+ private static class HopRowToKVMessage extends DoFn<HopRow, KV<String,
byte[]>> {
private final int messageIndex;
+ private final int partitionKeyIndex;
private final String transformName;
private final String rowMetaJson;
private transient IValueMeta valueMeta;
+ private transient IValueMeta partitionKeyValueMeta;
private transient Counter outputCounter;
private transient Counter readCounter;
- public HopRowToMessage(String transformName, String rowMetaJson, int
messageIndex) {
+ public HopRowToKVMessage(
+ String transformName, String rowMetaJson, int messageIndex, int
partitionKeyIndex) {
this.transformName = transformName;
this.rowMetaJson = rowMetaJson;
this.messageIndex = messageIndex;
+ this.partitionKeyIndex = partitionKeyIndex;
}
@Setup
@@ -170,6 +200,9 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
BeamHop.init();
IRowMeta rowMeta = JsonRowMeta.fromJson(rowMetaJson);
valueMeta = rowMeta.getValueMeta(messageIndex);
+ if (partitionKeyIndex >= 0) {
+ partitionKeyValueMeta = rowMeta.getValueMeta(partitionKeyIndex);
+ }
Metrics.counter(Pipeline.METRIC_NAME_INIT, transformName).inc();
} catch (Exception e) {
@@ -190,7 +223,17 @@ public class BeamKinesisProduceTransform extends
PTransform<PCollection<HopRow>,
//
try {
byte[] message = valueMeta.getBinary(hopRow.getRow()[messageIndex]);
- processContext.output(message);
+
+ // Extract partition key
+ String partitionKeyValue = "";
+ if (partitionKeyIndex >= 0) {
+ partitionKeyValue =
partitionKeyValueMeta.getString(hopRow.getRow()[partitionKeyIndex]);
+ if (partitionKeyValue == null) {
+ partitionKeyValue = "";
+ }
+ }
+
+ processContext.output(KV.of(partitionKeyValue, message));
outputCounter.inc();
} catch (Exception e) {
throw new RuntimeException(
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
index 4e36bce3ed..ea30fd8ff6 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/transforms/kinesis/KinesisRecordToHopRowFn.java
@@ -19,7 +19,7 @@
package org.apache.hop.beam.transforms.kinesis;
import java.nio.charset.StandardCharsets;
-import org.apache.beam.sdk.io.kinesis.KinesisRecord;
+import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
diff --git a/plugins/tech/neo4j/pom.xml b/plugins/tech/neo4j/pom.xml
index c5d676b5f6..f052a53c1b 100755
--- a/plugins/tech/neo4j/pom.xml
+++ b/plugins/tech/neo4j/pom.xml
@@ -32,6 +32,18 @@
<neo4j.driver.version>5.15.0</neo4j.driver.version>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hop</groupId>
+ <artifactId>hop-libs</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>org.neo4j.driver</groupId>
diff --git a/plugins/tech/neo4j/src/assembly/assembly.xml
b/plugins/tech/neo4j/src/assembly/assembly.xml
index 86c679e05c..55aa8ff693 100644
--- a/plugins/tech/neo4j/src/assembly/assembly.xml
+++ b/plugins/tech/neo4j/src/assembly/assembly.xml
@@ -51,6 +51,7 @@
<excludes>
<exclude>org.apache.hop:hop-tech-neo4j:jar</exclude>
<exclude>org.reactivestreams:reactive-streams:jar</exclude>
+ <exclude>io.netty::jar</exclude>
</excludes>
<outputDirectory>plugins/tech/neo4j/lib</outputDirectory>
</dependencySet>
@@ -58,6 +59,7 @@
<scope>runtime</scope>
<includes>
<include>org.reactivestreams:reactive-streams:jar</include>
+ <include>io.netty::jar</include>
</includes>
<outputDirectory>lib/core</outputDirectory>
</dependencySet>
diff --git a/pom.xml b/pom.xml
index 7b38167235..525f4ac4ad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,7 +97,7 @@
</distributionManagement>
<properties>
- <apache-beam.version>2.62.0</apache-beam.version>
+ <apache-beam.version>2.70.0</apache-beam.version>
<assembly_appendId>false</assembly_appendId>
<assembly_package-phase>package</assembly_package-phase>
<attach-sources-phase>verify</attach-sources-phase>
diff --git a/tools/find_duplicate_jars.sh b/tools/find_duplicate_jars.sh
new file mode 100755
index 0000000000..ad43fbe3fe
--- /dev/null
+++ b/tools/find_duplicate_jars.sh
@@ -0,0 +1,131 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+SEARCH_DIR="assemblies/client/target/hop"
+
+# Check if the search directory exists
+if [ ! -d "$SEARCH_DIR" ]; then
+ echo "Error: Directory '$SEARCH_DIR' does not exist" >&2
+ exit 1
+fi
+
+find "$SEARCH_DIR" -type f -name "*.jar" | awk -F/ '
+ function get_size(path, cmd, s) {
+ cmd = "stat -f%z \"" path "\""
+ cmd | getline s
+ close(cmd)
+ return s+0
+ }
+
+ function human(n) {
+ if (n < 1024) return n " B"
+ else if (n < 1048576) return sprintf("%.1f KB", n/1024)
+ else if (n < 1073741824) return sprintf("%.1f MB", n/1048576)
+ else return sprintf("%.1f GB", n/1073741824)
+ }
+
+ function repeat(c, n, s,i) {
+ s=""
+ for(i=1;i<=n;i++) s=s c
+ return s
+ }
+
+ {
+ full = $0
+ file = $NF
+ base = file
+ sub(/-[0-9][0-9A-Za-z._-]*\.jar$/, "", base)
+
+ size = get_size(full)
+
+ count[base]++
+ total_size[base] += size
+ if (size > max_size[base]) max_size[base] = size
+ if (!(base in example_file)) {
+ example_file[base] = file
+ example_size[base] = size
+ }
+ }
+
+ END {
+ # compute max widths
+ max_count = 5
+ max_name = length("NAME")
+ max_example = length("EXAMPLE")
+ max_size_w = length("SIZE")
+ max_total = length("TOTAL")
+ max_saved = length("SAVED")
+
+ for (b in count) {
+ if (count[b] > 1) {
+ ex = example_file[b]
+ max_name = (length(b) > max_name ? length(b) : max_name)
+ max_example = (length(ex) > max_example ? length(ex) : max_example)
+ s_ex = human(example_size[b])
+ s_tot = human(total_size[b])
+ s_saved = human(total_size[b] - max_size[b])
+ max_size_w = (length(s_ex) > max_size_w ? length(s_ex) : max_size_w)
+ max_total = (length(s_tot) > max_total ? length(s_tot) : max_total)
+ max_saved = (length(s_saved) > max_saved ? length(s_saved) :
max_saved)
+ saved_map[b] = total_size[b] - max_size[b]
+ }
+ }
+
+ # sort by saved descending (bubble sort for macOS awk)
+ n = 0
+ for (b in saved_map) {
+ n++
+ sorted[n] = b
+ }
+ for(i=1;i<=n;i++)
+ for(j=i+1;j<=n;j++)
+ if(saved_map[sorted[i]] < saved_map[sorted[j]]) {
+ t=sorted[i]; sorted[i]=sorted[j]; sorted[j]=t
+ }
+
+ # prepare format string
+ fmt = "%" max_count "s %-" max_name "s %-" max_example "s %" max_size_w
"s %" max_total "s %" max_saved "s\n"
+
+ # print header
+ printf fmt, "COUNT", "NAME", "EXAMPLE", "SIZE", "TOTAL", "SAVED"
+
+ # separator
+ total_width = max_count + 1 + max_name + 1 + max_example + 1 +
max_size_w + 1 + max_total + 1 + max_saved
+ printf "%s\n", repeat("-", total_width)
+
+ # print data rows
+ sum_saved=0
+ for(i=1;i<=n;i++){
+ b = sorted[i]
+ cnt = count[b]
+ ex_file = example_file[b]
+ ex_size = human(example_size[b])
+ tot = human(total_size[b])
+ saved = human(saved_map[b])
+ sum_saved += saved_map[b]
+
+ printf fmt, cnt, b, ex_file, ex_size, tot, saved
+ }
+
+ # footer
+ printf "%s\n", repeat("-", total_width)
+ printf "Total potential savings: %s\n", human(sum_saved)
+ }
+ '
+