[
https://issues.apache.org/jira/browse/BEAM-1857?focusedWorklogId=678984&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-678984
]
ASF GitHub Bot logged work on BEAM-1857:
----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Nov/21 11:33
Start Date: 09/Nov/21 11:33
Worklog Time Spent: 10m
Work Description: mosche commented on a change in pull request #15916:
URL: https://github.com/apache/beam/pull/15916#discussion_r745529126
##########
File path:
sdks/java/io/neo4j/src/main/java/org/apache/beam/sdk/io/neo4j/Neo4jIO.java
##########
@@ -0,0 +1,1358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.neo4j;
+
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.harness.JvmInitializer;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.neo4j.driver.Logging;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.SessionConfig;
+import org.neo4j.driver.TransactionConfig;
+import org.neo4j.driver.TransactionWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Beam IO to read from, and write data to, Neo4j.
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <h3>Driver configuration</h3>
+ *
+ * <p>To read from or write to Neo4j you have to provide a {@link
DriverConfiguration} using<br>
+ * 1. {@link DriverConfiguration#create()} (which must be {@link
Serializable});<br>
+ * 2. or {@link DriverConfiguration#create(String, String, String)} (URL,
username and password).
+ *
+ * <p>If you have trouble connecting to a Neo4j Aura database please try to
disable a few security
+ * algorithms in your JVM. This makes sure that the right one is picked to
connect:
+ *
+ * <p>
+ *
+ * <pre>{@code
+ * Security.setProperty(
+ * "jdk.tls.disabledAlgorithms",
+ * "SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224,
3DES_EDE_CBC, anon, NULL");
+ * }</pre>
+ *
+ * <p>
+ *
+ * <p>
+ *
+ * <p>To execute this code on GCP Dataflow you can create a class which
extends {@link
+ * JvmInitializer} and implement the {@link JvmInitializer#onStartup()}
method. You need to annotate
+ * this new class with {@link com.google.auto.service.AutoService}
+ *
+ * <pre>{@code
+ * @AutoService(value = JvmInitializer.class)
+ * }</pre>
+ *
+ * <p>
+ *
+ * <h3>Reading from Neo4j</h3>
+ *
+ * <p>{@link Neo4jIO#read()} source returns a bounded collection of {@code
OuptutT} as a {@code
+ * PCollection<OutputT>}. OutputT is the type returned by the provided {@link
RowMapper}. It accepts
+ * parameters as input in the form of {@code ParameterT} as a {@code
PCollection<ParameterT>}
+ *
+ * <p>The following example reads ages to return the IDs of Person nodes. It
runs a Cypher query for
+ * each provided age.
+ *
+ * <p>The {@link ParametersMapper} maps input values to each execution of the
Cypher statement.
+ * These parameter values are stored in a {@link Map} (mapping String to
Object). The map is cleared
+ * after each execution.
+ *
+ * <p>The {@link RowMapper} converts output Neo4j {@link Record} values to the
output of the source.
+ *
+ * <pre>{@code
+ * pipeline.apply( Neo4jIO.<Integer, String>read()
+ *
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687",
"neo4j", "password"))
+ * .withCypher("MATCH(n:Person) WHERE n.age = $age RETURN n.id")
+ * .withReadTransaction()
+ * .withCoder(StringUtf8Coder.of())
+ * .withParametersMapper((age, rowMap) -> rowMap.put("age", age))
+ * .withRowMapper( record -> return record.get(0).asString() )
+ * );
+ * }</pre>
+ *
+ * <h3>Writing to Neo4j</h3>
+ *
+ * <p>Neo4j sink supports writing data to a graph. It writes a {@link
PCollection} to the graph by
+ * collecting a batch of rows after which all rows in this batch are written
together to Neo4j. This
+ * is done using the {@link WriteUnwind} sink transform.
+ *
+ * <p>Like the source, to configure the sink, you have to provide a {@link
DriverConfiguration}.
+ *
+ * <p>In the following example we'll merge a collection of {@link
org.apache.beam.sdk.values.Row}
+ * into Person nodes. Since this is a Sink it has no output and as such no
RowMapper is needed. The
+ * rows are being used as a container for the parameters of the Cypher
statement. The used Cypher in
+ * question needs to be an UNWIND statement. Like in the read case, the {@link
ParametersMapper}
+ * maps values to a {@link Map<String, Object>}. The difference is that the
resulting Map is stored
+ * in a {@link List} (containing maps) which in turn is stored in another Map
under the name
+ * provided by the {@link WriteUnwind#withUnwindMapName(String)} method. All
of this is handled
+ * automatically. You do need to provide the unwind map name so that you can
reference that in the
+ * UNWIND statement.
+ *
+ * <p>
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(...)
+ * .apply(Neo4jIO.<Row>writeUnwind()
+ *
.withDriverConfiguration(Neo4jIO.DriverConfiguration.create("neo4j://localhost:7687",
"neo4j", "password"))
+ * .withUnwindMapName("rows")
+ * .withCypher("UNWIND $rows AS row MERGE(n:Person { id : row.id } ) SET
n.firstName = row.first, n.lastName = row.last")
+ * .withParametersMapper( (row, map ) -> {
+ * map.put("id", row.getString("id));
+ * map.put("first", row.getString("firstName"));
+ * map.put("last", row.getString("lastName"));
+ * })
+ * );
+ * }</pre>
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class Neo4jIO {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Neo4jIO.class);
+
+ /**
+ * Read all rows using a Neo4j Cypher query.
+ *
+ * @param <ParameterT> Type of the data representing query parameters.
+ * @param <OutputT> Type of the data to be read.
+ */
+ public static <ParameterT, OutputT> Read<ParameterT, OutputT> read() {
+ return new AutoValue_Neo4jIO_Read.Builder<ParameterT, OutputT>()
+
.setFetchSize(ValueProvider.StaticValueProvider.of(Config.defaultConfig().fetchSize()))
+ .build();
+ }
+
+ /**
+ * Write all rows using a Neo4j Cypher UNWIND cypher statement. This sets a
default batch batch
+ * size of 5000.
+ *
+ * @param <ParameterT> Type of the data representing query parameters.
+ */
+ public static <ParameterT> WriteUnwind<ParameterT> writeUnwind() {
+ return new AutoValue_Neo4jIO_WriteUnwind.Builder<ParameterT>()
+ .setBatchSize(ValueProvider.StaticValueProvider.of(5000L))
+ .build();
+ }
+
+ private static <ParameterT, OutputT> PCollection<OutputT>
getOutputPCollection(
+ PCollection<ParameterT> input, DoFn<ParameterT, OutputT> writeFn,
Coder<OutputT> coder) {
+ PCollection<OutputT> output =
input.apply(ParDo.of(writeFn)).setCoder(coder);
+
+ try {
+ TypeDescriptor<OutputT> typeDesc = coder.getEncodedTypeDescriptor();
+ SchemaRegistry registry = input.getPipeline().getSchemaRegistry();
+ Schema schema = registry.getSchema(typeDesc);
+ output.setSchema(
+ schema,
+ typeDesc,
+ registry.getToRowFunction(typeDesc),
+ registry.getFromRowFunction(typeDesc));
+ } catch (NoSuchSchemaException e) {
+ // ignore
+ }
+ return output;
+ }
+
+ /**
+ * An interface used by {@link Read} for converting each row of a Neo4j
{@link Result} record
+ * {@link Record} into an element of the resulting {@link PCollection}.
+ */
+ @FunctionalInterface
+ public interface RowMapper<T> extends Serializable {
+ T mapRow(Record record) throws Exception;
+ }
+
+ /**
+ * An interface used by {@link Read} for converting input parameter data to
a parameters map which
+ * can be used by your Neo4j cypher query.
+ */
+ @FunctionalInterface
+ public interface ParametersMapper<ParameterT> extends Serializable {
+ /**
+ * Simply map an input record to a parameters map.
+ *
+ * @param input the input read
+ * @param parametersMap the parameters map to update in this llabmda
+ */
+ void mapParameters(ParameterT input, Map<String, Object> parametersMap);
Review comment:
Looking at the code I was wondering if this mapper needs to be stateful?
Can't this just be a function `ParameterT -> Map<String, Object>`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 678984)
Time Spent: 1h 50m (was: 1h 40m)
> Add Neo4jIO
> -----------
>
> Key: BEAM-1857
> URL: https://issues.apache.org/jira/browse/BEAM-1857
> Project: Beam
> Issue Type: New Feature
> Components: io-ideas
> Reporter: kassem shehady
> Priority: P2
> Labels: stale-P2
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)