This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new fa171f4c14 fold hop-engine-beam into hop-engines-beam, fixes #7235
(#7236)
fa171f4c14 is described below
commit fa171f4c14a880cd88bdfd0df177088e5bf5e661
Author: Hans Van Akelyen <[email protected]>
AuthorDate: Mon Jun 8 14:20:01 2026 +0200
fold hop-engine-beam into hop-engines-beam, fixes #7235 (#7236)
---
assemblies/client/pom.xml | 6 -
assemblies/client/src/assembly/assembly.xml | 1 -
assemblies/debug/pom.xml | 6 -
engine-beam/pom.xml | 120 ---------
engine-beam/src/assembly/assembly.xml | 32 ---
plugins/engines/beam/pom.xml | 6 -
.../org/apache/hop/beam/core/BeamDefaults.java | 0
.../java/org/apache/hop/beam/core/BeamHop.java | 0
.../main/java/org/apache/hop/beam/core/HopRow.java | 0
.../org/apache/hop/beam/core/fn/HopKeyValueFn.java | 0
.../IBeamPipelineEngineRunConfiguration.java | 0
.../org/apache/hop/beam/metadata/RunnerType.java | 0
.../HopPipelineMetaToBeamPipelineConverter.java | 8 +-
.../pipeline/IBeamPipelineTransformHandler.java | 0
.../handler/BeamMemoryGroupByTransformHandler.java | 114 ++++++++
plugins/transforms/memgroupby/pom.xml | 6 -
.../transforms/memgroupby/MemoryGroupByMeta.java | 103 +-------
.../memgroupby/beam/AggregationType.java | 42 ---
.../transforms/memgroupby/beam/GroupByFn.java | 286 ---------------------
.../memgroupby/beam/GroupByTransform.java | 205 ---------------
pom.xml | 4 +-
rest/pom.xml | 15 --
22 files changed, 123 insertions(+), 831 deletions(-)
diff --git a/assemblies/client/pom.xml b/assemblies/client/pom.xml
index cc58fee89f..1adf6ccf45 100644
--- a/assemblies/client/pom.xml
+++ b/assemblies/client/pom.xml
@@ -61,12 +61,6 @@
<version>${project.version}</version>
<type>zip</type>
</dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-engine-beam</artifactId>
- <version>${project.version}</version>
- <type>zip</type>
- </dependency>
<dependency>
<groupId>org.apache.hop</groupId>
<artifactId>hop-engines-beam</artifactId>
diff --git a/assemblies/client/src/assembly/assembly.xml
b/assemblies/client/src/assembly/assembly.xml
index c149989804..d25f356db3 100644
--- a/assemblies/client/src/assembly/assembly.xml
+++ b/assemblies/client/src/assembly/assembly.xml
@@ -40,7 +40,6 @@
<include>org.apache.hop:hop-core:zip</include>
<include>org.apache.hop:hop-engine:zip</include>
<include>org.apache.hop:hop-ui:zip</include>
- <include>org.apache.hop:hop-engine-beam:zip</include>
<include>org.apache.hop:hop-engines-beam:zip</include>
<include>org.apache.hop:hop-ui-rcp:zip</include>
</includes>
diff --git a/assemblies/debug/pom.xml b/assemblies/debug/pom.xml
index 1b4414b68b..0adb66780b 100644
--- a/assemblies/debug/pom.xml
+++ b/assemblies/debug/pom.xml
@@ -373,12 +373,6 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-engine-beam</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.hop</groupId>
<artifactId>hop-engines-beam</artifactId>
diff --git a/engine-beam/pom.xml b/engine-beam/pom.xml
deleted file mode 100644
index 8e1ecf99dd..0000000000
--- a/engine-beam/pom.xml
+++ /dev/null
@@ -1,120 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop</artifactId>
- <version>2.19.0-SNAPSHOT</version>
- </parent>
-
- <artifactId>hop-engine-beam</artifactId>
- <packaging>jar</packaging>
- <name>Hop Engine Beam</name>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-libs</artifactId>
- <version>${project.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
-
<artifactId>beam-sdks-java-google-cloud-platform-bom</artifactId>
- <version>${apache-beam.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <dependencies>
- <dependency>
- <groupId>jakarta.servlet</groupId>
- <artifactId>jakarta.servlet-api</artifactId>
- <version>6.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.beam</groupId>
- <artifactId>beam-sdks-java-core</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.11</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.module</groupId>
- <artifactId>jackson-module-scala_2.13</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.errorprone</groupId>
- <artifactId>error_prone_annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.beam</groupId>
-
<artifactId>beam-sdks-java-transform-service-launcher</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.checkerframework</groupId>
- <artifactId>checker-qual</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-core</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-engine</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-core</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/engine-beam/src/assembly/assembly.xml
b/engine-beam/src/assembly/assembly.xml
deleted file mode 100644
index 0f58e2063f..0000000000
--- a/engine-beam/src/assembly/assembly.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.2.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.2.0
http://maven.apache.org/xsd/assembly-2.2.0.xsd">
- <id>hop-engine-beam</id>
- <formats>
- <format>zip</format>
- </formats>
- <baseDirectory>lib/core</baseDirectory>
- <dependencySets>
- <dependencySet>
- <outputDirectory>.</outputDirectory>
- </dependencySet>
- </dependencySets>
-</assembly>
\ No newline at end of file
diff --git a/plugins/engines/beam/pom.xml b/plugins/engines/beam/pom.xml
index 9edf758f89..466c97e1cf 100644
--- a/plugins/engines/beam/pom.xml
+++ b/plugins/engines/beam/pom.xml
@@ -1437,12 +1437,6 @@
<artifactId>beam-sdks-java-core</artifactId>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-engine-beam</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.hop</groupId>
<artifactId>hop-transform-constant</artifactId>
diff --git
a/engine-beam/src/main/java/org/apache/hop/beam/core/BeamDefaults.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/BeamDefaults.java
similarity index 100%
rename from engine-beam/src/main/java/org/apache/hop/beam/core/BeamDefaults.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/core/BeamDefaults.java
diff --git a/engine-beam/src/main/java/org/apache/hop/beam/core/BeamHop.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/BeamHop.java
similarity index 100%
rename from engine-beam/src/main/java/org/apache/hop/beam/core/BeamHop.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/core/BeamHop.java
diff --git a/engine-beam/src/main/java/org/apache/hop/beam/core/HopRow.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/HopRow.java
similarity index 100%
rename from engine-beam/src/main/java/org/apache/hop/beam/core/HopRow.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/core/HopRow.java
diff --git
a/engine-beam/src/main/java/org/apache/hop/beam/core/fn/HopKeyValueFn.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/HopKeyValueFn.java
similarity index 100%
rename from
engine-beam/src/main/java/org/apache/hop/beam/core/fn/HopKeyValueFn.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/core/fn/HopKeyValueFn.java
diff --git
a/engine-beam/src/main/java/org/apache/hop/beam/engines/IBeamPipelineEngineRunConfiguration.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/IBeamPipelineEngineRunConfiguration.java
similarity index 100%
rename from
engine-beam/src/main/java/org/apache/hop/beam/engines/IBeamPipelineEngineRunConfiguration.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/engines/IBeamPipelineEngineRunConfiguration.java
diff --git
a/engine-beam/src/main/java/org/apache/hop/beam/metadata/RunnerType.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/metadata/RunnerType.java
similarity index 100%
rename from
engine-beam/src/main/java/org/apache/hop/beam/metadata/RunnerType.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/metadata/RunnerType.java
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
index cc78c9c62f..d1048c7fda 100644
---
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/HopPipelineMetaToBeamPipelineConverter.java
@@ -43,6 +43,7 @@ import
org.apache.hop.beam.engines.IBeamPipelineEngineRunConfiguration;
import
org.apache.hop.beam.engines.dataflow.BeamDataFlowPipelineRunConfiguration;
import org.apache.hop.beam.metadata.RunnerType;
import org.apache.hop.beam.pipeline.handler.BeamGenericTransformHandler;
+import org.apache.hop.beam.pipeline.handler.BeamMemoryGroupByTransformHandler;
import org.apache.hop.beam.pipeline.handler.BeamMergeJoinTransformHandler;
import org.apache.hop.beam.pipeline.handler.BeamRowGeneratorTransformHandler;
import org.apache.hop.beam.util.BeamConst;
@@ -81,7 +82,10 @@ public class HopPipelineMetaToBeamPipelineConverter {
* {@link #addDefaultTransformHandlers()}.
*/
public static final Set<String> EXPLICIT_HANDLER_PLUGIN_IDS =
- Set.of(BeamConst.STRING_MERGE_JOIN_PLUGIN_ID,
BeamConst.STRING_BEAM_ROW_GENERATOR_PLUGIN_ID);
+ Set.of(
+ BeamConst.STRING_MERGE_JOIN_PLUGIN_ID,
+ BeamConst.STRING_BEAM_ROW_GENERATOR_PLUGIN_ID,
+ BeamConst.STRING_MEMORY_GROUP_BY_PLUGIN_ID);
/**
* Transform meta classes that Beam refuses to run at all, mapped to the
user-facing reason. The
@@ -233,6 +237,8 @@ public class HopPipelineMetaToBeamPipelineConverter {
BeamConst.STRING_MERGE_JOIN_PLUGIN_ID, new
BeamMergeJoinTransformHandler());
transformHandlers.put(
BeamConst.STRING_BEAM_ROW_GENERATOR_PLUGIN_ID, new
BeamRowGeneratorTransformHandler());
+ transformHandlers.put(
+ BeamConst.STRING_MEMORY_GROUP_BY_PLUGIN_ID, new
BeamMemoryGroupByTransformHandler());
genericTransformHandler = new BeamGenericTransformHandler();
}
diff --git
a/engine-beam/src/main/java/org/apache/hop/beam/pipeline/IBeamPipelineTransformHandler.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/IBeamPipelineTransformHandler.java
similarity index 100%
rename from
engine-beam/src/main/java/org/apache/hop/beam/pipeline/IBeamPipelineTransformHandler.java
rename to
plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/IBeamPipelineTransformHandler.java
diff --git
a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMemoryGroupByTransformHandler.java
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMemoryGroupByTransformHandler.java
new file mode 100644
index 0000000000..f552ad0053
--- /dev/null
+++
b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/handler/BeamMemoryGroupByTransformHandler.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.beam.pipeline.handler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hop.beam.core.HopRow;
+import org.apache.hop.beam.core.transform.GroupByTransform;
+import org.apache.hop.beam.engines.IBeamPipelineEngineRunConfiguration;
+import org.apache.hop.beam.pipeline.IBeamPipelineTransformHandler;
+import org.apache.hop.core.exception.HopException;
+import org.apache.hop.core.logging.ILogChannel;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.JsonRowMeta;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.metadata.api.IHopMetadataProvider;
+import org.apache.hop.pipeline.PipelineMeta;
+import org.apache.hop.pipeline.transform.TransformMeta;
+import org.apache.hop.pipeline.transforms.memgroupby.GAggregate;
+import org.apache.hop.pipeline.transforms.memgroupby.MemoryGroupByMeta;
+
+public class BeamMemoryGroupByTransformHandler extends BeamBaseTransformHandler
+ implements IBeamPipelineTransformHandler {
+
+ @Override
+ public boolean isInput() {
+ return false;
+ }
+
+ @Override
+ public boolean isOutput() {
+ return false;
+ }
+
+ @Override
+ public void handleTransform(
+ ILogChannel log,
+ IVariables variables,
+ String runConfigurationName,
+ IBeamPipelineEngineRunConfiguration runConfiguration,
+ String dataSamplersJson,
+ IHopMetadataProvider metadataProvider,
+ PipelineMeta pipelineMeta,
+ TransformMeta transformMeta,
+ Map<String, PCollection<HopRow>> transformCollectionMap,
+ Pipeline pipeline,
+ IRowMeta rowMeta,
+ List<TransformMeta> previousTransforms,
+ PCollection<HopRow> input,
+ String parentLogChannelId)
+ throws HopException {
+
+ // Don't simply cast but serialize/de-serialize the metadata to prevent
classloader exceptions
+ //
+ MemoryGroupByMeta meta = new MemoryGroupByMeta();
+ loadTransformMetadata(meta, transformMeta, metadataProvider, pipelineMeta);
+
+ String[] subjectFields = new String[meta.getAggregates().size()];
+ String[] aggregateCodes = new String[meta.getAggregates().size()];
+
+ for (int i = 0; i < meta.getAggregates().size(); i++) {
+ GAggregate aggregate = meta.getAggregates().get(i);
+
+ aggregateCodes[i] = aggregate.getType().getCode();
+ subjectFields[i] = aggregate.getSubject();
+ }
+
+ List<String> groups = new ArrayList<>();
+ meta.getGroups().forEach(group -> groups.add(group.getField()));
+
+ PTransform<PCollection<HopRow>, PCollection<HopRow>> groupByTransform =
+ new GroupByTransform(
+ transformMeta.getName(),
+ JsonRowMeta.toJson(rowMeta), // The io row
+ groups.toArray(new String[0]),
+ subjectFields,
+ aggregateCodes,
+ new String[] {});
+
+ // Apply the transform to the previous io transform PCollection(s)
+ //
+ PCollection<HopRow> transformPCollection =
+ input.apply(transformMeta.getName(), groupByTransform);
+
+ // Save this in the map
+ //
+ transformCollectionMap.put(transformMeta.getName(), transformPCollection);
+ log.logBasic(
+ "Handled Group By (TRANSFORM) : "
+ + transformMeta.getName()
+ + ", gets data from "
+ + previousTransforms.size()
+ + " previous transform(s)");
+ }
+}
diff --git a/plugins/transforms/memgroupby/pom.xml
b/plugins/transforms/memgroupby/pom.xml
index 7fba0f9014..f384d15078 100644
--- a/plugins/transforms/memgroupby/pom.xml
+++ b/plugins/transforms/memgroupby/pom.xml
@@ -47,11 +47,5 @@
<artifactId>commons-math3</artifactId>
<scope>provided</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-engine-beam</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
</dependencies>
</project>
diff --git
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/MemoryGroupByMeta.java
b/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/MemoryGroupByMeta.java
index 74b7289123..11b21d7e9f 100644
---
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/MemoryGroupByMeta.java
+++
b/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/MemoryGroupByMeta.java
@@ -19,23 +19,12 @@ package org.apache.hop.pipeline.transforms.memgroupby;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hop.beam.core.BeamHop;
-import org.apache.hop.beam.core.HopRow;
-import org.apache.hop.beam.engines.IBeamPipelineEngineRunConfiguration;
-import org.apache.hop.beam.pipeline.IBeamPipelineTransformHandler;
import org.apache.hop.core.CheckResult;
import org.apache.hop.core.ICheckResult;
import org.apache.hop.core.annotations.Transform;
-import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopPluginException;
-import org.apache.hop.core.logging.ILogChannel;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
-import org.apache.hop.core.row.JsonRowMeta;
import org.apache.hop.core.row.RowMeta;
import org.apache.hop.core.row.value.ValueMetaFactory;
import org.apache.hop.core.row.value.ValueMetaNone;
@@ -47,7 +36,6 @@ import org.apache.hop.metadata.api.IHopMetadataProvider;
import org.apache.hop.pipeline.PipelineMeta;
import org.apache.hop.pipeline.transform.BaseTransformMeta;
import org.apache.hop.pipeline.transform.TransformMeta;
-import org.apache.hop.pipeline.transforms.memgroupby.beam.GroupByTransform;
@Transform(
id = "MemoryGroupBy",
@@ -58,8 +46,7 @@ import
org.apache.hop.pipeline.transforms.memgroupby.beam.GroupByTransform;
"i18n:org.apache.hop.pipeline.transform:BaseTransform.Category.Statistics",
keywords = "i18n::MemoryGroupByMeta.keyword",
documentationUrl = "/pipeline/transforms/memgroupby.html")
-public class MemoryGroupByMeta extends BaseTransformMeta<MemoryGroupBy,
MemoryGroupByData>
- implements IBeamPipelineTransformHandler {
+public class MemoryGroupByMeta extends BaseTransformMeta<MemoryGroupBy,
MemoryGroupByData> {
private static final Class<?> PKG = MemoryGroupByMeta.class;
/** Fields to group over */
@@ -234,94 +221,6 @@ public class MemoryGroupByMeta extends
BaseTransformMeta<MemoryGroupBy, MemoryGr
}
}
- @Override
- public boolean isInput() {
- return false;
- }
-
- @Override
- public boolean isOutput() {
- return false;
- }
-
- /**
- * Handle the transform in a Beam pipeline
- *
- * @param log
- * @param variables
- * @param runConfigurationName
- * @param runConfiguration
- * @param dataSamplersJson
- * @param metadataProvider
- * @param pipelineMeta
- * @param transformMeta
- * @param transformCollectionMap
- * @param pipeline
- * @param rowMeta
- * @param previousTransforms
- * @param input
- * @param parentLogChannelId
- * @throws HopException
- */
- @Override
- public void handleTransform(
- ILogChannel log,
- IVariables variables,
- String runConfigurationName,
- IBeamPipelineEngineRunConfiguration runConfiguration,
- String dataSamplersJson,
- IHopMetadataProvider metadataProvider,
- PipelineMeta pipelineMeta,
- TransformMeta transformMeta,
- Map<String, PCollection<HopRow>> transformCollectionMap,
- Pipeline pipeline,
- IRowMeta rowMeta,
- List<TransformMeta> previousTransforms,
- PCollection<HopRow> input,
- String parentLogChannelId)
- throws HopException {
-
- MemoryGroupByMeta meta = new MemoryGroupByMeta();
- BeamHop.loadTransformMetadata(meta, transformMeta, metadataProvider,
pipelineMeta);
-
- String[] subjectFields = new String[meta.getAggregates().size()];
- String[] aggregateCodes = new String[meta.getAggregates().size()];
-
- for (int i = 0; i < meta.getAggregates().size(); i++) {
- GAggregate aggregate = meta.getAggregates().get(i);
-
- aggregateCodes[i] = aggregate.getType().getCode();
- subjectFields[i] = aggregate.getSubject();
- }
-
- List<String> groups = new ArrayList<>();
- meta.getGroups().forEach(group -> groups.add(group.getField()));
-
- PTransform<PCollection<HopRow>, PCollection<HopRow>> groupByTransform =
- new GroupByTransform(
- transformMeta.getName(),
- JsonRowMeta.toJson(rowMeta), // The io row
- groups.toArray(new String[0]),
- subjectFields,
- aggregateCodes,
- new String[] {});
-
- // Apply the transform to the previous io transform PCollection(s)
- //
- PCollection<HopRow> transformPCollection =
- input.apply(transformMeta.getName(), groupByTransform);
-
- // Save this in the map
- //
- transformCollectionMap.put(transformMeta.getName(), transformPCollection);
- log.logBasic(
- "Handled Group By (TRANSFORM) : "
- + transformMeta.getName()
- + ", gets data from "
- + previousTransforms.size()
- + " previous transform(s)");
- }
-
@SuppressWarnings("java:S115")
public enum GroupType implements IEnumHasCodeAndDescription {
None("-", "-"),
diff --git
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/AggregationType.java
b/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/AggregationType.java
deleted file mode 100644
index 7d3236ba68..0000000000
---
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/AggregationType.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hop.pipeline.transforms.memgroupby.beam;
-
-import org.apache.hop.core.exception.HopException;
-
-public enum AggregationType {
- SUM,
- AVERAGE,
- COUNT_ALL,
- MIN,
- MAX,
- FIRST_INCL_NULL,
- LAST_INCL_NULL,
- FIRST,
- LAST,
- ;
-
- public static final AggregationType getTypeFromName(String name) throws
HopException {
- for (AggregationType type : values()) {
- if (name.equals(type.name())) {
- return type;
- }
- }
- throw new HopException("Aggregation type '" + name + "' is not recognized
or supported");
- }
-}
diff --git
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/GroupByFn.java
b/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/GroupByFn.java
deleted file mode 100644
index d7adacf7b9..0000000000
---
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/GroupByFn.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hop.pipeline.transforms.memgroupby.beam;
-
-import java.math.BigDecimal;
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.hop.beam.core.BeamHop;
-import org.apache.hop.beam.core.HopRow;
-import org.apache.hop.core.exception.HopException;
-import org.apache.hop.core.exception.HopRuntimeException;
-import org.apache.hop.core.row.IRowMeta;
-import org.apache.hop.core.row.IValueMeta;
-import org.apache.hop.core.row.JsonRowMeta;
-import org.apache.hop.core.row.RowDataUtil;
-import org.apache.hop.pipeline.Pipeline;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GroupByFn extends DoFn<KV<HopRow, Iterable<HopRow>>, HopRow> {
-
- private String counterName;
- private String groupRowMetaJson; // The data types of the group fields
- private String subjectRowMetaJson; // The data types of the subject fields
- private String[] aggregations; // The aggregation types
-
- private static final Logger LOG = LoggerFactory.getLogger(GroupByFn.class);
-
- private transient IRowMeta groupRowMeta;
- private transient IRowMeta subjectRowMeta;
-
- private transient AggregationType[] aggregationTypes = null;
-
- private transient Counter initCounter;
- private transient Counter readCounter;
- private transient Counter writtenCounter;
- private transient Counter errorCounter;
-
- public GroupByFn() {}
-
- public GroupByFn(
- String counterName,
- String groupRowMetaJson,
- String subjectRowMetaJson,
- String[] aggregations) {
- this.counterName = counterName;
- this.groupRowMetaJson = groupRowMetaJson;
- this.subjectRowMetaJson = subjectRowMetaJson;
- this.aggregations = aggregations;
- }
-
- @Setup
- public void setUp() {
- try {
- readCounter = Metrics.counter(Pipeline.METRIC_NAME_READ, counterName);
- writtenCounter = Metrics.counter(Pipeline.METRIC_NAME_WRITTEN,
counterName);
- errorCounter = Metrics.counter(Pipeline.METRIC_NAME_ERROR, counterName);
-
- // Initialize Hop Beam
- //
- BeamHop.init();
- groupRowMeta = JsonRowMeta.fromJson(groupRowMetaJson);
- subjectRowMeta = JsonRowMeta.fromJson(subjectRowMetaJson);
- aggregationTypes = new AggregationType[aggregations.length];
- for (int i = 0; i < aggregationTypes.length; i++) {
- aggregationTypes[i] = AggregationType.getTypeFromName(aggregations[i]);
- }
-
- Metrics.counter(Pipeline.METRIC_NAME_INIT, counterName).inc();
- } catch (Exception e) {
- errorCounter.inc();
- LOG.error("Error setup of grouping by ", e);
- throw new HopRuntimeException("Unable setup of group by ", e);
- }
- }
-
- @ProcessElement
- public void processElement(ProcessContext processContext) {
-
- try {
-
- // Get a KV
- //
- KV<HopRow, Iterable<HopRow>> inputElement = processContext.element();
-
- // Get the key row
- //
- HopRow groupHopRow = inputElement.getKey();
- Object[] groupRow = groupHopRow.getRow();
-
- // Initialize the aggregation results for this window
- //
- Object[] results = new Object[aggregationTypes.length];
- long[] counts = new long[aggregationTypes.length];
- for (int i = 0; i < results.length; i++) {
- results[i] = null;
- counts[i] = 0L;
- }
-
- Iterable<HopRow> subjectHopRows = inputElement.getValue();
- for (HopRow subjectHopRow : subjectHopRows) {
- Object[] subjectRow = subjectHopRow.getRow();
- readCounter.inc();
-
- // Aggregate this...
- //
- for (int i = 0; i < aggregationTypes.length; i++) {
- IValueMeta subjectValueMeta = subjectRowMeta.getValueMeta(i);
- Object subject = subjectRow[i];
- Object result = results[i];
-
- switch (aggregationTypes[i]) {
- case AVERAGE:
- // Calculate count AND sum
- // Then correct below
- //
- if (!subjectValueMeta.isNull(subject)) {
- counts[i]++;
- }
- case SUM:
- {
- if (result == null) {
- result = subject;
- } else {
- result =
- switch (subjectValueMeta.getType()) {
- case IValueMeta.TYPE_INTEGER -> (Long) result + (Long)
subject;
- case IValueMeta.TYPE_NUMBER -> (Double) result +
(Double) subject;
- default ->
- throw new HopException(
- "SUM aggregation not yet implemented for field
and data type : "
- + subjectValueMeta.toString());
- };
- }
- }
- break;
- case COUNT_ALL:
- if (subject != null) {
- if (result == null) {
- result = 1L;
- } else {
- result = (Long) result + 1L;
- }
- }
- break;
- case MIN:
- if (subjectValueMeta.isNull(result)) {
- // Previous result was null? Then take the subject
- result = subject;
- } else {
- if (subjectValueMeta.compare(subject, result) < 0) {
- result = subject;
- }
- }
- break;
- case MAX:
- if (subjectValueMeta.isNull(result)) {
- // Previous result was null? Then take the subject
- result = subject;
- } else {
- if (subjectValueMeta.compare(subject, result) > 0) {
- result = subject;
- }
- }
- break;
- case FIRST_INCL_NULL:
- if (counts[i] == 0) {
- counts[i]++;
- result = subject;
- }
- break;
- case LAST_INCL_NULL:
- result = subject;
- break;
- case FIRST:
- if (!subjectValueMeta.isNull(subject) && counts[i] == 0) {
- counts[i]++;
- result = subject;
- }
- break;
- case LAST:
- if (!subjectValueMeta.isNull(subject)) {
- result = subject;
- }
- break;
- default:
- throw new HopException(
- "Sorry, aggregation type yet: "
- + aggregationTypes[i].name()
- + " isn't implemented yet");
- }
- results[i] = result;
- }
- }
-
- // Do a pass to correct average
- //
- for (int i = 0; i < results.length; i++) {
- IValueMeta subjectValueMeta = subjectRowMeta.getValueMeta(i);
- switch (aggregationTypes[i]) {
- case AVERAGE:
- switch (subjectValueMeta.getType()) {
- case IValueMeta.TYPE_NUMBER:
- double dbl = (Double) results[i];
- if (counts[i] != 0) {
- dbl /= counts[i];
- }
- results[i] = dbl;
- break;
- case IValueMeta.TYPE_INTEGER:
- long lng = (Long) results[i];
- if (counts[i] != 0) {
- lng /= counts[i];
- }
- results[i] = lng;
- break;
- case IValueMeta.TYPE_BIGNUMBER:
- BigDecimal bd = (BigDecimal) results[i];
- if (counts[i] != 0) {
- bd = bd.divide(BigDecimal.valueOf(counts[i]));
- }
- results[i] = bd;
- default:
- throw new HopException(
- "Unable to calculate average on data type : " +
subjectValueMeta.getTypeDesc());
- }
- }
- }
-
- // Now we have the results
- // Concatenate both group and result...
- //
- Object[] resultRow = RowDataUtil.allocateRowData(groupRowMeta.size() +
subjectRowMeta.size());
- int index = 0;
- for (int i = 0; i < groupRowMeta.size(); i++) {
- resultRow[index++] = groupRow[i];
- }
- for (int i = 0; i < subjectRowMeta.size(); i++) {
- resultRow[index++] = results[i];
- }
-
- // Send it on its way
- //
- processContext.output(new HopRow(resultRow));
- writtenCounter.inc();
-
- } catch (Exception e) {
- errorCounter.inc();
- LOG.error("Error grouping by ", e);
- throw new HopRuntimeException("Unable to split row into group and
subject ", e);
- }
- }
-
- /**
- * Gets aggregations
- *
- * @return value of aggregations
- */
- public String[] getAggregations() {
- return aggregations;
- }
-
- /**
- * @param aggregations The aggregations to set
- */
- public void setAggregations(String[] aggregations) {
- this.aggregations = aggregations;
- }
-}
diff --git
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/GroupByTransform.java
b/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/GroupByTransform.java
deleted file mode 100644
index 06080fd696..0000000000
---
a/plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/beam/GroupByTransform.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hop.pipeline.transforms.memgroupby.beam;
-
-import org.apache.beam.sdk.metrics.Counter;
-import org.apache.beam.sdk.metrics.Metrics;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.hop.beam.core.BeamHop;
-import org.apache.hop.beam.core.HopRow;
-import org.apache.hop.beam.core.fn.HopKeyValueFn;
-import org.apache.hop.core.exception.HopRuntimeException;
-import org.apache.hop.core.row.IRowMeta;
-import org.apache.hop.core.row.JsonRowMeta;
-import org.apache.hop.core.row.RowMeta;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GroupByTransform extends PTransform<PCollection<HopRow>,
PCollection<HopRow>> {
-
- // The non-transient methods are serializing
- // Keep them simple to stay out of trouble.
- //
- private String transformName;
- private String rowMetaJson; // The input row
- private String[] groupFields; // The fields to group over
- private String[] subjects; // The subjects to aggregate on
- private String[] aggregations; // The aggregation types
- private String[] resultFields; // The result fields
-
- private static final Logger LOG =
LoggerFactory.getLogger(GroupByTransform.class);
- private final Counter numErrors = Metrics.counter("main",
"GroupByTransformErrors");
-
- private transient IRowMeta inputRowMeta;
- private transient IRowMeta groupRowMeta;
- private transient IRowMeta subjectRowMeta;
-
- public GroupByTransform() {}
-
- public GroupByTransform(
- String transformName,
- String rowMetaJson,
- String[] groupFields,
- String[] subjects,
- String[] aggregations,
- String[] resultFields) {
- this.transformName = transformName;
- this.rowMetaJson = rowMetaJson;
- this.groupFields = groupFields;
- this.subjects = subjects;
- this.aggregations = aggregations;
- this.resultFields = resultFields;
- }
-
- @Override
- public PCollection<HopRow> expand(PCollection<HopRow> input) {
- try {
- if (inputRowMeta == null) {
- BeamHop.init();
-
- inputRowMeta = JsonRowMeta.fromJson(rowMetaJson);
-
- groupRowMeta = new RowMeta();
- for (String groupField : groupFields) {
- groupRowMeta.addValueMeta(inputRowMeta.searchValueMeta(groupField));
- }
- subjectRowMeta = new RowMeta();
- for (String subject : subjects) {
- subjectRowMeta.addValueMeta(inputRowMeta.searchValueMeta(subject));
- }
- }
-
- // Split the HopRow into GroupFields-HopRow and SubjectFields-HopRow
- //
- PCollection<KV<HopRow, HopRow>> groupSubjects =
- input.apply(
- ParDo.of(new HopKeyValueFn(rowMetaJson, groupFields, subjects,
transformName)));
-
- // Now we need to aggregate the groups with a Combine
- GroupByKey<HopRow, HopRow> byKey = GroupByKey.<HopRow, HopRow>create();
- PCollection<KV<HopRow, Iterable<HopRow>>> grouped =
groupSubjects.apply(byKey);
-
- // Aggregate the rows in the grouped PCollection
- // Input: KV<HopRow>, Iterable<HopRow>>
- // This means that The group rows is in HopRow. For every one of
these, you get a list of
- // subject rows.
- // We need to calculate the aggregation of these subject lists
- // Then we output group values with result values behind it.
- //
- String counterName = transformName + " AGG";
- PCollection<HopRow> output =
- grouped.apply(
- ParDo.of(
- new GroupByFn(
- counterName,
- JsonRowMeta.toJson(groupRowMeta),
- JsonRowMeta.toJson(subjectRowMeta),
- aggregations)));
-
- return output;
- } catch (Exception e) {
- numErrors.inc();
- LOG.error("Error in group by transform", e);
- throw new HopRuntimeException("Error in group by transform", e);
- }
- }
-
- /**
- * Gets inputRowMetaJson
- *
- * @return value of inputRowMetaJson
- */
- public String getRowMetaJson() {
- return rowMetaJson;
- }
-
- /**
- * @param rowMetaJson The inputRowMetaJson to set
- */
- public void setRowMetaJson(String rowMetaJson) {
- this.rowMetaJson = rowMetaJson;
- }
-
- /**
- * Gets groupFields
- *
- * @return value of groupFields
- */
- public String[] getGroupFields() {
- return groupFields;
- }
-
- /**
- * @param groupFields The groupFields to set
- */
- public void setGroupFields(String[] groupFields) {
- this.groupFields = groupFields;
- }
-
- /**
- * Gets subjects
- *
- * @return value of subjects
- */
- public String[] getSubjects() {
- return subjects;
- }
-
- /**
- * @param subjects The subjects to set
- */
- public void setSubjects(String[] subjects) {
- this.subjects = subjects;
- }
-
- /**
- * Gets aggregations
- *
- * @return value of aggregations
- */
- public String[] getAggregations() {
- return aggregations;
- }
-
- /**
- * @param aggregations The aggregations to set
- */
- public void setAggregations(String[] aggregations) {
- this.aggregations = aggregations;
- }
-
- /**
- * Gets resultFields
- *
- * @return value of resultFields
- */
- public String[] getResultFields() {
- return resultFields;
- }
-
- /**
- * @param resultFields The resultFields to set
- */
- public void setResultFields(String[] resultFields) {
- this.resultFields = resultFields;
- }
-}
diff --git a/pom.xml b/pom.xml
index 46122baf0b..40a5225210 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,7 +97,7 @@
</distributionManagement>
<properties>
- <apache-beam.version>2.71.0</apache-beam.version>
+ <apache-beam.version>2.74.0</apache-beam.version>
<assembly_appendId>false</assembly_appendId>
<assembly_package-phase>package</assembly_package-phase>
<attach-sources-phase>verify</attach-sources-phase>
@@ -674,7 +674,6 @@
<modules>
<module>core</module>
<module>engine</module>
- <module>engine-beam</module>
<module>lib</module>
<module>lib-jdbc</module>
<module>lib-p2</module>
@@ -905,7 +904,6 @@
<excludes>hop-plugins/**.zip</excludes>
<excludes>hop-core/**.zip</excludes>
<excludes>hop-engine/**.zip</excludes>
- <excludes>hop-engine-beam/**.zip</excludes>
<excludes>hop-ui-rap/**.zip</excludes>
<excludes>hop-ui-rcp/**.zip</excludes>
<excludes>hop-rest/**.zip</excludes>
diff --git a/rest/pom.xml b/rest/pom.xml
index d961516aae..03677f3732 100644
--- a/rest/pom.xml
+++ b/rest/pom.xml
@@ -87,21 +87,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hop</groupId>
- <artifactId>hop-engine-beam</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.glassfish.jersey.connectors</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.glassfish.jersey.core</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.hop</groupId>
<artifactId>hop-ui</artifactId>