This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4681486571e Set connection attributes in SingleStoreIO (#27487)
4681486571e is described below

commit 4681486571e943b1dce681c75bd579fe33affdb2
Author: AdalbertMemSQL <[email protected]>
AuthorDate: Tue Aug 8 22:45:32 2023 +0300

    Set connection attributes in SingleStoreIO (#27487)
    
    * check point
    
    * Updated S2 version
    
    * Added test
    
    * Reformatted code
    
    * Deleted debug output
    
    * Deleted unused imports
---
 .../job_PostCommit_Java_SingleStoreIO_IT.groovy    |  1 +
 .../kubernetes/singlestore/sdb-cluster.yaml        |  2 +-
 .../beam/sdk/io/singlestore/SingleStoreIO.java     |  7 ++
 .../SingleStoreIOConnectionAttributesIT.java       | 92 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy 
b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
index f43da5a0531..901b7636433 100644
--- a/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Java_SingleStoreIO_IT.groovy
@@ -81,6 +81,7 @@ PostcommitJobBuilder.postCommitJob(jobName,
           switches("-DintegrationTestRunner=dataflow")
           tasks(":sdks:java:io:singlestore:integrationTest --tests 
org.apache.beam.sdk.io.singlestore.SingleStoreIODefaultMapperIT")
           tasks(":sdks:java:io:singlestore:integrationTest --tests 
org.apache.beam.sdk.io.singlestore.SingleStoreIOSchemaTransformIT")
+          tasks(":sdks:java:io:singlestore:integrationTest --tests 
org.apache.beam.sdk.io.singlestore.SingleStoreIOConnectionAttributesIT")
         }
       }
     }
diff --git a/.test-infra/kubernetes/singlestore/sdb-cluster.yaml 
b/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
index 655a8bfb524..8acb0b7a7f7 100644
--- a/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
+++ b/.test-infra/kubernetes/singlestore/sdb-cluster.yaml
@@ -22,7 +22,7 @@ spec:
   adminHashedPassword: "*9177CC8207174BDBB5ED66B2140C75171283F15D"
   nodeImage:
     repository: singlestore/node
-    tag: alma-7.5.22-8b82f8a84e
+    tag: latest
 
   redundancyLevel: 1
 
diff --git 
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
 
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
index de2a6805278..b8c088d7ac0 100644
--- 
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
+++ 
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -347,6 +348,12 @@ public class SingleStoreIO {
       String connectionProperties =
           SingleStoreUtil.getArgumentWithDefault(getConnectionProperties(), 
"");
       connectionProperties += (connectionProperties.isEmpty() ? "" : ";") + 
"allowLocalInfile=TRUE";
+      connectionProperties +=
+          String.format(
+              
";connectionAttributes=_connector_name:%s,_connector_version:%s,_product_version:%s",
+              "Apache Beam SingleStoreDB I/O",
+              ReleaseInfo.getReleaseInfo().getVersion(),
+              ReleaseInfo.getReleaseInfo().getVersion());
       String username = getUsername();
       String password = getPassword();
 
diff --git 
a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOConnectionAttributesIT.java
 
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOConnectionAttributesIT.java
new file mode 100644
index 00000000000..6023ae00d4f
--- /dev/null
+++ 
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/SingleStoreIOConnectionAttributesIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.singlestore;
+
+import static 
org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class SingleStoreIOConnectionAttributesIT {
+  private static String serverName;
+
+  private static String username;
+
+  private static String password;
+
+  private static Integer port;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    SingleStoreIOTestPipelineOptions options;
+    try {
+      options = 
readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
+    } catch (IllegalArgumentException e) {
+      options = null;
+    }
+    org.junit.Assume.assumeNotNull(options);
+
+    serverName = options.getSingleStoreServerName();
+    username = options.getSingleStoreUsername();
+    password = options.getSingleStorePassword();
+    port = options.getSingleStorePort();
+  }
+
+  @Test
+  public void connectionAttributes() throws Exception {
+    Map<String, String> attributes = new HashMap<String, String>();
+    attributes.put("_connector_name", "Apache Beam SingleStoreDB I/O");
+    attributes.put("_connector_version", 
ReleaseInfo.getReleaseInfo().getVersion());
+    attributes.put("_product_version", 
ReleaseInfo.getReleaseInfo().getVersion());
+
+    SingleStoreIO.DataSourceConfiguration dataSourceConfiguration =
+        SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port)
+            .withPassword(password)
+            .withUsername(username);
+
+    DataSource dataSource = dataSourceConfiguration.getDataSource();
+
+    try (Connection conn = dataSource.getConnection();
+        Statement stmt = conn.createStatement();
+        ResultSet rs =
+            stmt.executeQuery("select * from 
information_schema.mv_connection_attributes"); ) {
+      while (rs.next()) {
+        String attribute = rs.getString(3);
+        String value = rs.getString(4);
+        if (attributes.containsKey(attribute)) {
+          assertEquals(attributes.get(attribute), value);
+          attributes.remove(attribute);
+        }
+      }
+    }
+
+    assertTrue(attributes.isEmpty());
+  }
+}

Reply via email to