mattcasters commented on a change in pull request #15916:
URL: https://github.com/apache/beam/pull/15916#discussion_r749358011



##########
File path: 
sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link 
DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link 
Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL, 
username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to 
disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to 
connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ *       "jdk.tls.disabledAlgorithms",
+ *       "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 
3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which 
extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()} 
method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#readAll()} source returns a bounded collection of {@code 
OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link 
RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code 
PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It 
runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The mapping {@link SerializableFunction} maps input values to each 
execution of the Cypher
+ * statement. In the function simply return a map containing the parameters 
you want to set.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the 
output of the source.
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(Create.of(40, 50, 60))
+ *   .apply(Neo4jIO.<Integer, String>readAll()
+ *     
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687",
 "neo4j", "password"))
+ *     .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ *     .withReadTransaction()
+ *     .withCoder(StringUtf8Coder.of())
+ *     .withParametersFunction( age -> Collections.singletonMap( "age", age ))
+ *     .withRowMapper( record -> return record.get(0).asString() )
+ *   );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link 
PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written 
together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link 
DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link 
org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no 
RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher 
statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the 
parameters {@link
+ * SerializableFunction} converts parameter values to a {@link Map<String, 
Object>}. The difference
+ * here is that the resulting Map is stored in a {@link List} (containing 
maps) which in turn is
+ * stored in another Map under the name provided by the {@link
+ * WriteUnwind#withUnwindMapName(String)} method. All of this is handled 
automatically. You do need
+ * to provide the unwind map name so that you can reference that in the UNWIND 
statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ *   .apply(...)
+ *   .apply(Neo4jIO.<Row>writeUnwind()
+ *      
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687",
 "neo4j", "password"))
+ *      .withUnwindMapName("rows")
+ *      .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET 
n.firstName = row.first, n.lastName = row.last")
+ *      .withParametersFunction( row -> ImmutableMap.of(
+ *        "id", row.getString("id),
+ *        "first", row.getString("firstName")
+ *        "last", row.getString("lastName")))
+ *    );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+  /**
+   * Read all rows using a Neo4j Cypher query.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   * @param <OutputT> Type of the data to be read.
+   */
+  public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
+    return new AutoValue_Neo4jIO_ReadAll.Builder<ParameterT, OutputT>()
+        
.setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+        .build();
+  }
+
+  /**
+   * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a 
default batch batch
+   * size of 5000.
+   *
+   * @param <ParameterT> Type of the data representing query parameters.
+   */
+  public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+    return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+        .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+        .build();
+  }
+
+  private static <ParameterT, OutputT> PCollection<OutputT> 
getOutputPCollection(
+      PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn, 
Coder<OutputT> coder) {
+    PCollection<OutputT> output = 
input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+    try {
+      TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+      SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+      Schema schema = registry.getSchema(typeDesc);
+      output.setSchema(
+          schema,
+          typeDesc,
+          registry.getToRowFunction(typeDesc),
+          registry.getFromRowFunction(typeDesc));
+    } catch (NoSuchSchemaException e) {
+      // ignore
+    }
+    return output;
+  }
+
+  /**
+   * An interface used by {@link ReadAll} for converting each row of a Neo4j 
{@link Result} record
+   * {@link Record} into an element of the resulting {@link PCollection}.
+   */
+  @FunctionalInterface
+  public interface RowMapper<T> extends Serializable {
+    T mapRow(Record record) throws Exception;
+  }
+
+  /** This describes all the information needed to create a Neo4j {@link 
Session}. */
+  @AutoValue
+  public abstract static class DriverConfiguration implements Serializable {
+    public static DriverConfiguration create() {
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder().build();
+    }
+
+    public static DriverConfiguration create(String url, String username, 
String password) {
+      checkArgument(url != null, "url can not be null");
+      checkArgument(username != null, "username can not be null");
+      checkArgument(password != null, "password can not be null");
+      return new AutoValue_Neo4jIO_DriverConfiguration.Builder()
+          .build()
+          .withUrl(url)
+          .withUsername(username)
+          .withPassword(password);
+    }
+
+    abstract @Nullable ValueProvider<String> getUrl();
+
+    abstract @Nullable ValueProvider<List<String>> getUrls();
+
+    abstract @Nullable ValueProvider<String> getUsername();
+
+    abstract @Nullable ValueProvider<String> getPassword();
+
+    abstract @Nullable ValueProvider<Boolean> getEncryption();
+
+    abstract @Nullable ValueProvider<Long> 
getConnectionLivenessCheckTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxConnectionLifetimeMs();
+
+    abstract @Nullable ValueProvider<Integer> getMaxConnectionPoolSize();
+
+    abstract @Nullable ValueProvider<Long> getConnectionAcquisitionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getConnectionTimeoutMs();
+
+    abstract @Nullable ValueProvider<Long> getMaxTransactionRetryTimeMs();
+
+    abstract @Nullable ValueProvider<Boolean> getRouting();
+
+    abstract Builder builder();
+
+    // URL
+    public DriverConfiguration withUrl(String url) {
+      return withUrl(ValueProvider.StaticValueProvider.of(url));
+    }
+
+    public DriverConfiguration withUrl(ValueProvider<String> url) {
+      Preconditions.checkArgument(
+          url != null, "a neo4j connection URL can not be empty or null", url);
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(url.get()),
+          "a neo4j connection URL can not be empty or null",
+          url);
+      return builder().setUrl(url).build();
+    }
+
+    // URLS
+    public DriverConfiguration withUrls(List<String> urls) {
+      return withUrls(ValueProvider.StaticValueProvider.of(urls));
+    }
+
+    public DriverConfiguration withUrls(ValueProvider<List<String>> urls) {
+      Preconditions.checkArgument(
+          urls != null, "a list of neo4j connection URLs can not be empty or 
null", urls);
+      Preconditions.checkArgument(
+          urls.get() != null && !urls.get().isEmpty(),
+          "a neo4j connection URL can not be empty or null",
+          urls);
+      return builder().setUrls(urls).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withEncryption() {
+      return 
builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutEncryption() {
+      return 
builder().setEncryption(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    // Connection Liveness Check Timout
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        long connectionLivenessCheckTimeoutMs) {
+      return withConnectionLivenessCheckTimeoutMs(
+          
ValueProvider.StaticValueProvider.of(connectionLivenessCheckTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionLivenessCheckTimeoutMs(
+        ValueProvider<Long> connectionLivenessCheckTimeoutMs) {
+      return builder()
+          
.setConnectionLivenessCheckTimeoutMs(connectionLivenessCheckTimeoutMs)
+          .build();
+    }
+
+    // Maximum Connection Lifetime
+    public DriverConfiguration withMaxConnectionLifetimeMs(long 
maxConnectionLifetimeMs) {
+      return withMaxConnectionLifetimeMs(
+          ValueProvider.StaticValueProvider.of(maxConnectionLifetimeMs));
+    }
+
+    public DriverConfiguration withMaxConnectionLifetimeMs(
+        ValueProvider<Long> maxConnectionLifetimeMs) {
+      return 
builder().setMaxConnectionLifetimeMs(maxConnectionLifetimeMs).build();
+    }
+
+    // Maximum Connection pool size
+    public DriverConfiguration withMaxConnectionPoolSize(int 
maxConnectionPoolSize) {
+      return 
withMaxConnectionPoolSize(ValueProvider.StaticValueProvider.of(maxConnectionPoolSize));
+    }
+
+    public DriverConfiguration withMaxConnectionPoolSize(
+        ValueProvider<Integer> maxConnectionPoolSize) {
+      return builder().setMaxConnectionPoolSize(maxConnectionPoolSize).build();
+    }
+
+    // Connection Acq Timeout
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        long connectionAcquisitionTimeoutMs) {
+      return withConnectionAcquisitionTimeoutMs(
+          
ValueProvider.StaticValueProvider.of(connectionAcquisitionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionAcquisitionTimeoutMs(
+        ValueProvider<Long> connectionAcquisitionTimeoutMs) {
+      return 
builder().setConnectionAcquisitionTimeoutMs(connectionAcquisitionTimeoutMs).build();
+    }
+
+    // Connection Timeout
+    public DriverConfiguration withConnectionTimeoutMs(long 
connectionTimeoutMs) {
+      return 
withConnectionTimeoutMs(ValueProvider.StaticValueProvider.of(connectionTimeoutMs));
+    }
+
+    public DriverConfiguration withConnectionTimeoutMs(ValueProvider<Long> 
connectionTimeoutMs) {
+      return builder().setConnectionTimeoutMs(connectionTimeoutMs).build();
+    }
+
+    // Maximum Transaction Retry Time
+    public DriverConfiguration withMaxTransactionRetryTimeMs(long 
maxTransactionRetryTimeMs) {
+      return withMaxTransactionRetryTimeMs(
+          ValueProvider.StaticValueProvider.of(maxTransactionRetryTimeMs));
+    }
+
+    public DriverConfiguration withMaxTransactionRetryTimeMs(
+        ValueProvider<Long> maxTransactionRetryTimeMs) {
+      return 
builder().setMaxTransactionRetryTimeMs(maxTransactionRetryTimeMs).build();
+    }
+
+    public DriverConfiguration withUsername(String username) {
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    public DriverConfiguration withUsername(ValueProvider<String> username) {
+      Preconditions.checkArgument(username != null, "neo4j username can not be 
null", username);
+      Preconditions.checkArgument(
+          username.get() != null, "neo4j username can not be null", username);
+      return builder().setUsername(username).build();
+    }
+
+    public DriverConfiguration withPassword(String password) {
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    public DriverConfiguration withPassword(ValueProvider<String> password) {
+      Preconditions.checkArgument(password != null, "neo4j password can not be 
null", password);
+      Preconditions.checkArgument(
+          password.get() != null, "neo4j password can not be null", password);
+      return builder().setPassword(password).build();
+    }
+
+    // Encryption
+    public DriverConfiguration withRouting() {
+      return 
builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.TRUE)).build();
+    }
+
+    public DriverConfiguration withoutRouting() {
+      return 
builder().setRouting(ValueProvider.StaticValueProvider.of(Boolean.FALSE)).build();
+    }
+
+    void populateDisplayData(DisplayData.Builder builder) {
+      builder.addIfNotNull(DisplayData.item("neo4j-url", getUrl()));
+      builder.addIfNotNull(DisplayData.item("neo4j-username", getUsername()));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-password", getPassword() != null ? "<provided>" : 
"<not-provided>"));
+      builder.addIfNotNull(
+          DisplayData.item(
+              "neo4j-encryption", getEncryption() != null ? "<provided>" : 
"<not-provided>"));
+    }
+
+    Driver buildDriver() {
+      // Create the Neo4j Driver
+      // The default is: have the driver make the determination
+      //
+      Config.ConfigBuilder configBuilder = Config.builder();

Review comment:
       FWIW I took a quick stab at making Neo4j Config Serializable but it 
didn't pan out.  My colleagues at Neo4j will pick up the task but it's 
apparently not realistically something that can be fixed in the short term.  
   https://github.com/neo4j/neo4j-java-driver/issues/1081




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to