[ 
https://issues.apache.org/jira/browse/NIFI-4118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079392#comment-16079392
 ] 

ASF GitHub Bot commented on NIFI-4118:
--------------------------------------

Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1942#discussion_r126293212
  
    --- Diff: 
nifi-nar-bundles/nifi-rethinkdb-bundle/nifi-rethinkdb-processors/src/test/java/org/apache/nifi/processors/rethinkdb/ITPutRethinkDBTest.java
 ---
    @@ -0,0 +1,216 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.rethinkdb;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import java.util.List;
    +import org.apache.nifi.util.MockFlowFile;
    +import org.apache.nifi.util.TestRunner;
    +import org.apache.nifi.util.TestRunners;
    +import org.json.simple.JSONArray;
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +
    +import com.rethinkdb.RethinkDB;
    +import com.rethinkdb.net.Connection;
    +
    +import net.minidev.json.JSONObject;
    +
    +/**
    + * Integration test for RethinkDB. Please ensure that the RethinkDB is 
running
    + * on local host with default port and has database test with table test 
and user
    + * admin with password admin before running the integration tests or set 
the attributes in the
    + * test accordingly.
    + */
    +@Ignore("Comment this out for running tests against a real instance of 
RethinkDB")
    +public class ITPutRethinkDBTest {
    +    private TestRunner runner;
    +    private Connection connection;
    +    private String dbName = "test";
    +    private String dbHost = "localhost";
    +    private String dbPort = "28015";
    +    private String user = "admin";
    +    private String password = "admin";
    +    private String table = "test";
    +
    +    @Before
    +    public void setUp() throws Exception {
    +        runner = TestRunners.newTestRunner(PutRethinkDB.class);
    +        runner.setProperty(PutRethinkDB.DB_NAME, dbName);
    +        runner.setProperty(PutRethinkDB.DB_HOST, dbHost);
    +        runner.setProperty(PutRethinkDB.DB_PORT, dbPort);
    +        runner.setProperty(PutRethinkDB.USERNAME, user);
    +        runner.setProperty(PutRethinkDB.PASSWORD, password);
    +        runner.setProperty(PutRethinkDB.TABLE_NAME, table);
    +        runner.setProperty(PutRethinkDB.CHARSET, "UTF-8");
    +        runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, 
PutRethinkDB.CONFLICT_STRATEGY_UPDATE);
    +        runner.setProperty(PutRethinkDB.DURABILITY, 
PutRethinkDB.DURABILITY_HARD);
    +        runner.setProperty(PutRethinkDB.MAX_DOCUMENTS_SIZE, "1 KB");
    +        runner.assertValid();
    +
    +        connection = RethinkDB.r.connection().user(user, 
password).db(dbName).hostname(dbHost).port(Integer.parseInt(dbPort)).connect();
    +    }
    +
    +    @After
    +    public void tearDown() throws Exception {
    +        runner = null;
    +        connection.close();
    +        connection = null;
    +    }
    +
    +    @Test
    +    public void testValidSingleMessage() {
    +        RethinkDB.r.db(dbName).table(table).delete().run(connection);
    +        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
    +        assertEquals("Count should be same", 0L, count);
    +
    +        JSONObject message = new JSONObject();
    +        message.put("hello", "rethinkdb");
    +        byte [] bytes = message.toJSONString().getBytes();
    +        runner.enqueue(bytes);
    +        runner.run(1,true,true);
    +        runner.assertAllFlowFilesTransferred(PutRethinkDB.REL_SUCCESS, 1);
    +
    +        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
 "0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
    +        
assertNotNull(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"1");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
    +
    +        count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
    +        assertEquals("Count should be same", 1L, count);
    +    }
    +
    +    @Test
    +    public void testValidSingleMessageTwiceConflictUpdate() {
    +        RethinkDB.r.db(dbName).table(table).delete().run(connection);
    +        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
    +        assertEquals("Count should be same", 0L, count);
    +
    +        JSONObject message = new JSONObject();
    +        message.put("id", "rethinkdb");
    +        byte [] bytes = message.toJSONString().getBytes();
    +        runner.enqueue(bytes);
    +        runner.run(1,false,true);
    +
    +        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
 "0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
    +        
assertNotNull(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"1");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
    +
    +        runner.enqueue(bytes);
    +        runner.run(1,true,true);
    +
    +        flowFiles = 
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
 "0");
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
    +        
assertNotNull(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"0");
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"1");
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
    +
    +        count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
    +        assertEquals("Count should be same", 1L, count);
    +    }
    +
    +    @Test
    +    public void testValidSingleMessageTwiceConflictError() {
    +        runner.setProperty(PutRethinkDB.CONFLICT_STRATEGY, 
PutRethinkDB.CONFLICT_STRATEGY_ERROR);
    +        RethinkDB.r.db(dbName).table(table).delete().run(connection);
    +        long count = 
RethinkDB.r.db(dbName).table(table).count().run(connection);
    +        assertEquals("Count should be same", 0L, count);
    +
    +        JSONObject message = new JSONObject();
    +        message.put("id", "rethinkdb");
    +        byte [] bytes = message.toJSONString().getBytes();
    +        runner.enqueue(bytes);
    +        runner.run(1,false,true);
    +
    +        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutRethinkDB.REL_SUCCESS);
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
 "0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_ERROR_KEY),"0");
    +        
assertNotNull(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_GENERATED_KEYS_KEY));
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_INSERTED_KEY),"1");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_REPLACED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_SKIPPED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_UNCHANGED_KEY),"0");
    +        
assertEquals(flowFiles.get(0).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_FIRST_ERROR_KEY),"null");
    +
    +        runner.enqueue(bytes);
    +        runner.run(1,true,true);
    +
    +        flowFiles = 
runner.getFlowFilesForRelationship(PutRethinkDB.REL_FAILURE);
    +        
assertEquals(flowFiles.get(1).getAttribute(PutRethinkDB.RETHINKDB_INSERT_RESULT_DELETED_KEY),
 "0");
    --- End diff --
    
    This integration test failed for me.  I believe the failing flowfile is now 
at index 0, since REL_FAILURE is separate.


> Create Nifi RethinkDB Put processor
> -----------------------------------
>
>                 Key: NIFI-4118
>                 URL: https://issues.apache.org/jira/browse/NIFI-4118
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 1.3.0
>         Environment: All
>            Reporter: Mans Singh
>            Assignee: Mans Singh
>            Priority: Minor
>              Labels: document, stream,
>             Fix For: 1.4.0
>
>
> Create Nifi processor for streaming documents into RethinkDB.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to