This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4681486571e Set connection attributes in SingleStoreIO (#27487)
4681486571e is described below
commit 4681486571e943b1dce681c75bd579fe33affdb2
Author: AdalbertMemSQL <[email protected]>
AuthorDate: Tue Aug 8 22:45:32 2023 +0300
Set connection attributes in SingleStoreIO (#27487)
* check point
* Updated S2 version
* Added test
* Reformatted code
* Deleted debug output
* Deleted unused imports
---
.../job_PostCommit_Java_SingleStoreIO_IT.groovy | 1 +
.../kubernetes/singlestore/sdb-cluster.yaml | 2 +-
.../beam/sdk/io/singlestore/SingleStoreIO.java | 7 ++
.../SingleStoreIOConnectionAttributesIT.java | 92 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 1 deletion(-)
diff --git a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
index f43da5a0531..901b7636433 100644
--- a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
@@ -81,6 +81,7 @@ PostcommitJobBuilder.postCommitJob(jobName,
switches("-DintegrationTestRunner=dataflow")
tasks(":sdks:java:io:singlestore:integrationTest --tests
org.apache.beam.sdk.io.singlestore.SingleStoreIODefaultMapperIT")
tasks(":sdks:java:io:singlestore:integrationTest --tests
org.apache.beam.sdk.io.singlestore.SingleStoreIOSchemaTransformIT")
+ tasks(":sdks:java:io:singlestore:integrationTest --tests
org.apache.beam.sdk.io.singlestore.SingleStoreIOConnectionAttributesIT")
}
}
}
diff --git a/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
b/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
index 655a8bfb524..8acb0b7a7f7 100644
--- a/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
+++ b/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
@@ -22,7 +22,7 @@ spec:
adminHashedPassword: "*9177CC8207174BDBB5ED66B2140C75171283F15D"
nodeImage:
repository: singlestore/node
- tag: alma-7.5.22-8b82f8a84e
+ tag: latest
redundancyLevel: 1
diff --git
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
index de2a6805278..b8c088d7ac0 100644
---
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
+++
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -347,6 +348,12 @@ public class SingleStoreIO {
String connectionProperties =
SingleStoreUtil.getArgumentWithDefault(getConnectionProperties(),
"");
connectionProperties += (connectionProperties.isEmpty() ? "" : ";") +
"allowLocalInfile=TRUE";
+ connectionProperties +=
+ String.format(
+
";connectionAttributes=_connector_name:%s,_connector_version:%s,_product_version:%s",
+ "Apache Beam SingleStoreDB I/O",
+ ReleaseInfo.getReleaseInfo().getVersion(),
+ ReleaseInfo.getReleaseInfo().getVersion());
String username = getUsername();
String password = getPassword();
diff --git
a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOConnectionAttributesIT.java
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOConnectionAttributesIT.java
new file mode 100644
index 00000000000..6023ae00d4f
--- /dev/null
+++
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOConnectionAttributesIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SingleStoreIOConnectionAttributesIT {
+ private static String serverName;
+
+ private static String username;
+
+ private static String password;
+
+ private static Integer port;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ SingleStoreIOTestPipelineOptions options;
+ try {
+ options =
readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
+ } catch (IllegalArgumentException e) {
+ options = null;
+ }
+ org.junit.Assume.assumeNotNull(options);
+
+ serverName = options.getSingleStoreServerName();
+ username = options.getSingleStoreUsername();
+ password = options.getSingleStorePassword();
+ port = options.getSingleStorePort();
+ }
+
+ @Test
+ public void connectionAttributes() throws Exception {
+ Map<String, String> attributes = new HashMap<String, String>();
+ attributes.put("_connector_name", "Apache Beam SingleStoreDB I/O");
+ attributes.put("_connector_version",
ReleaseInfo.getReleaseInfo().getVersion());
+ attributes.put("_product_version",
ReleaseInfo.getReleaseInfo().getVersion());
+
+ SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+ SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port)
+ .withPassword(password)
+ .withUsername(username);
+
+ DataSource dataSource = dataSourceConfiguration.getDataSource();
+
+ try (Connection conn = dataSource.getConnection();
+ Statement stmt = conn.createStatement();
+ ResultSet rs =
+ stmt.executeQuery("select * from
information_schema.mv_connection_attributes"); ) {
+ while (rs.next()) {
+ String attribute = rs.getString(3);
+ String value = rs.getString(4);
+ if (attributes.containsKey(attribute)) {
+ assertEquals(attributes.get(attribute), value);
+ attributes.remove(attribute);
+ }
+ }
+ }
+
+ assertTrue(attributes.isEmpty());
+ }
+}