[ 
https://issues.apache.org/jira/browse/NIFI-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610760#comment-16610760
 ] 

ASF GitHub Bot commented on NIFI-4731:
--------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2682#discussion_r216704031
  
    --- Diff: 
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
 ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.processors.gcp.bigquery;
    +
    +import com.google.api.gax.retrying.RetrySettings;
    +import com.google.auth.oauth2.GoogleCredentials;
    +import com.google.cloud.bigquery.BigQuery;
    +import com.google.cloud.bigquery.BigQueryOptions;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
    +import org.apache.nifi.util.StringUtils;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Set;
    +
    +/**
    + * Base class for creating processors that connect to GCP BiqQuery service
    + */
    +public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<BigQuery, BigQueryOptions> {
    +    static final int BUFFER_SIZE = 65536;
    +    public static final Relationship REL_SUCCESS =
    +            new Relationship.Builder().name("success")
    +                    .description("FlowFiles are routed to this 
relationship after a successful Google BigQuery operation.")
    +                    .build();
    +    public static final Relationship REL_FAILURE =
    +            new Relationship.Builder().name("failure")
    +                    .description("FlowFiles are routed to this 
relationship if the Google BigQuery operation fails.")
    +                    .build();
    +
    +    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
    +            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    +
    +    public static final PropertyDescriptor DATASET = new PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.DATASET_ATTR)
    +            .displayName("Dataset")
    +            .description(BigQueryAttributes.DATASET_DESC)
    +            .required(true)
    +            .defaultValue("${" + BigQueryAttributes.DATASET_ATTR + "}")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_NAME = new 
PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.TABLE_NAME_ATTR)
    +            .displayName("Table Name")
    +            .description(BigQueryAttributes.TABLE_NAME_DESC)
    +            .required(true)
    +            .defaultValue("${" + BigQueryAttributes.TABLE_NAME_ATTR + "}")
    +            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor TABLE_SCHEMA = new 
PropertyDescriptor
    +            .Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR)
    +            .displayName("Table Schema")
    +            .description(BigQueryAttributes.TABLE_SCHEMA_DESC)
    +            .required(false)
    +            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor READ_TIMEOUT = new 
PropertyDescriptor
    --- End diff --
    
    You missed that property in the list of supported property descriptors for 
the processor. It causes a NPE:
    ````
    2018-09-11 17:00:21,606 ERROR [Timer-Driven Process Thread-6] 
o.a.n.p.gcp.bigquery.PutBigQueryBatch 
PutBigQueryBatch[id=c8b9f07f-0165-1000-80e2-3ab46d7fb69f] null: 
java.lang.NullPointerException
    java.lang.NullPointerException: null
            at 
org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch.onTrigger(PutBigQueryBatch.java:229)
            at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
            at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
            at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
            at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
            at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
            at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    ````


> BigQuery processors
> -------------------
>
>                 Key: NIFI-4731
>                 URL: https://issues.apache.org/jira/browse/NIFI-4731
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Mikhail Sosonkin
>            Priority: Major
>
> NIFI should have processors for putting data into BigQuery (Streaming and 
> Batch).
> Initial working processors can be found this repository: 
> https://github.com/nologic/nifi/tree/NIFI-4731/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery
> I'd like to get them into Nifi proper.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to