[
https://issues.apache.org/jira/browse/NIFI-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079392#comment-16079392
]
ASF GitHub Bot commented on NIFI-4118:
--------------------------------------
Github user jvwing commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1942#discussion_r126293212
--- Diff:
nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.rethinkdb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import java.util.List;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.json.simple.JSONArray;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.net.Connection;
+
+import net.minidev.json.JSONObject;
+
+/**
+ * Integration test for RethinkDB. Please ensure that the RethinkDB is
running
+ * on local host with default port and has database test with table test
and user
+ * admin with password admin before running the integration tests or set
the attributes in the
+ * test accordingly.
+ */
+@Ignore("Comment this out for running tests against a real instance of
RethinkDB")
+public class ITPutRethinkDBTest {
+ private TestRunner runner;
+ private Connection connection;
+ private String dbName = "test";
+ private String dbHost = "localhost";
+ private String dbPort = "28015";
+ private String user = "admin";
+ private String password = "admin";
+ private String table = "test";
+
+ @Before
+ public void setUp() throws Exception {
+ runner = TestRunners.newTestRunner(PutRethinkDB.class);
+ runner.setProperty(PutRethinkDB.DB_NAME, dbName);
+ runner.setProperty(PutRethinkDB.DB_HOST, dbHost);
+ runner.setProperty(PutRethinkDB.DB_PORT, dbPort);
+ runner.setProperty(PutRethinkDB.USERNAME, user);
+ runner.setProperty(PutRethinkDB.PASSWORD, password);
+ runner.setProperty(PutRethinkDB.TABLE_NAME, table);
+ runner.setProperty(PutRethinkDB.CHARSET, "UTF-8");
+ runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY,
PutRethinkDB.CONFLICT_STRATEGY_UPDATE);
+ runner.setProperty(PutRethinkDB.DURABILITY,
PutRethinkDB.DURABILITY_HARD);
+ runner.setProperty(PutRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB");
+ runner.assertValid();
+
+ connection = RethinkDB.r.connection().user(user,
password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ runner = null;
+ connection.close();
+ connection = null;
+ }
+
+ @Test
+ public void testValidSingleMessage() {
+ RethinkDB.r.db(dbName).table(table).delete().run(connection);
+ long count =
RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 0L, count);
+
+ JSONObject message = new JSONObject();
+ message.put("hello", "rethinkdb");
+ byte [] bytes = message.toJSONString().getBytes();
+ runner.enqueue(bytes);
+ runner.run(1,true,true);
+ runner.assertAllFlowFilesTransferred(PutRethinkDB.REL_SUCCESS, 1);
+
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
+
assertNotNull(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"1");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
+
+ count =
RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 1L, count);
+ }
+
+ @Test
+ public void testValidSingleMessageTwiceConflictUpdate() {
+ RethinkDB.r.db(dbName).table(table).delete().run(connection);
+ long count =
RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 0L, count);
+
+ JSONObject message = new JSONObject();
+ message.put("id", "rethinkdb");
+ byte [] bytes = message.toJSONString().getBytes();
+ runner.enqueue(bytes);
+ runner.run(1,false,true);
+
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
+
assertNotNull(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"1");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
+
+ runner.enqueue(bytes);
+ runner.run(1,true,true);
+
+ flowFiles =
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
"0");
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
+
assertNotNull(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"0");
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"1");
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
+
+ count =
RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 1L, count);
+ }
+
+ @Test
+ public void testValidSingleMessageTwiceConflictError() {
+ runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY,
PutRethinkDB.CONFLICT_STRATEGY_ERROR);
+ RethinkDB.r.db(dbName).table(table).delete().run(connection);
+ long count =
RethinkDB.r.db(dbName).table(table).count().run(connection);
+ assertEquals("Count should be same", 0L, count);
+
+ JSONObject message = new JSONObject();
+ message.put("id", "rethinkdb");
+ byte [] bytes = message.toJSONString().getBytes();
+ runner.enqueue(bytes);
+ runner.run(1,false,true);
+
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
+
assertNotNull(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"1");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"0");
+
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
+
+ runner.enqueue(bytes);
+ runner.run(1,true,true);
+
+ flowFiles =
runner.getFlowFilesForRelationship(PutRethinkDB.REL_FAILURE);
+
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
"0");
--- End diff --
This integration test failed for me. I believe the failing flowfile is now
at index 0, since REL_FAILURE is separate.
> Create Nifi RethinkDB Put processor
> -----------------------------------
>
> Key: NIFI-4118
> URL: https://issues.apache.org/jira/browse/NIFI-4118
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 1.3.0
> Environment: All
> Reporter: Mans Singh
> Assignee: Mans Singh
> Priority: Minor
> Labels: document, stream,
> Fix For: 1.4.0
>
>
> Create Nifi processor for streaming documents into RethinkDB.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)