[ https://issues.apache.org/jira/browse/NIFI-4731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610736#comment-16610736 ]
ASF GitHub Bot commented on NIFI-4731: -------------------------------------- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2682#discussion_r216697497 --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.java --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.gcp.bigquery; + +import com.google.cloud.RetryOption; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import com.google.common.collect.ImmutableList; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.LogLevel; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.gcp.storage.DeleteGCSObject; +import org.apache.nifi.processors.gcp.storage.PutGCSObject; +import org.apache.nifi.util.StringUtils; +import org.threeten.bp.Duration; +import org.threeten.bp.temporal.ChronoUnit; + +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * A processor for batch loading data into a Google BigQuery table + */ + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@Tags({"google", "google cloud", "bq", "bigquery"}) +@CapabilityDescription("Batch loads flow files to a Google BigQuery table.") +@SeeAlso({PutGCSObject.class, DeleteGCSObject.class}) + +@WritesAttributes({ + @WritesAttribute(attribute = BigQueryAttributes.DATASET_ATTR, description = BigQueryAttributes.DATASET_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_NAME_ATTR, description = BigQueryAttributes.TABLE_NAME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.TABLE_SCHEMA_ATTR, description = BigQueryAttributes.TABLE_SCHEMA_DESC), + @WritesAttribute(attribute = BigQueryAttributes.SOURCE_TYPE_ATTR, description = BigQueryAttributes.SOURCE_TYPE_DESC), + @WritesAttribute(attribute = BigQueryAttributes.IGNORE_UNKNOWN_ATTR, description = BigQueryAttributes.IGNORE_UNKNOWN_DESC), + @WritesAttribute(attribute = BigQueryAttributes.CREATE_DISPOSITION_ATTR, description = BigQueryAttributes.CREATE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.WRITE_DISPOSITION_ATTR, description = BigQueryAttributes.WRITE_DISPOSITION_DESC), + @WritesAttribute(attribute = BigQueryAttributes.MAX_BADRECORDS_ATTR, description = BigQueryAttributes.MAX_BADRECORDS_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), + @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC) +}) + +public class PutBigQueryBatch extends AbstractBigQueryProcessor { + + public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor + .Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR) + .displayName("Load file type") + .description(BigQueryAttributes.SOURCE_TYPE_DESC) + .required(true) + .allowableValues(FormatOptions.json().getType(), FormatOptions.avro().getType(), FormatOptions.csv().getType()) + .defaultValue(FormatOptions.avro().getType()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor IGNORE_UNKNOWN = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.IGNORE_UNKNOWN_ATTR) + .displayName("Ignore Unknown Values") + .description(BigQueryAttributes.IGNORE_UNKNOWN_DESC) + .required(true) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.CREATE_DISPOSITION_ATTR) + .displayName("Create Disposition") + .description(BigQueryAttributes.CREATE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name(), JobInfo.CreateDisposition.CREATE_NEVER.name()) + .defaultValue(JobInfo.CreateDisposition.CREATE_IF_NEEDED.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.WRITE_DISPOSITION_ATTR) + .displayName("Write Disposition") + .description(BigQueryAttributes.WRITE_DISPOSITION_DESC) + .required(true) + .allowableValues(JobInfo.WriteDisposition.WRITE_EMPTY.name(), JobInfo.WriteDisposition.WRITE_APPEND.name(), JobInfo.WriteDisposition.WRITE_TRUNCATE.name()) + .defaultValue(JobInfo.WriteDisposition.WRITE_EMPTY.name()) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder() + .name(BigQueryAttributes.MAX_BADRECORDS_ATTR) + .displayName("Max Bad Records") + .description(BigQueryAttributes.MAX_BADRECORDS_DESC) + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + private Schema schemaCache = null; + + public PutBigQueryBatch() { + + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return ImmutableList.<PropertyDescriptor>builder() + .addAll(super.getSupportedPropertyDescriptors()) + .add(DATASET) + .add(TABLE_NAME) + .add(TABLE_SCHEMA) + .add(SOURCE_TYPE) + .add(CREATE_DISPOSITION) + .add(WRITE_DISPOSITION) + .add(MAXBAD_RECORDS) + .add(IGNORE_UNKNOWN) + .build(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) { + super.onScheduled(context); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Map<String, String> attributes = new HashMap<>(); + + final BigQuery bq = getCloudService(); + + final String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue(); + final String dataset = context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue(); + final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue(); + + final TableId tableId; + if (StringUtils.isEmpty(projectId)) { + tableId = TableId.of(dataset, tableName); + } else { + tableId = TableId.of(projectId, dataset, tableName); + } + + final String fileType = context.getProperty(SOURCE_TYPE).getValue(); + + String schemaString = context.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions().getValue(); + Schema schema = BqUtils.schemaFromString(schemaString); + + WriteChannelConfiguration writeChannelConfiguration = + WriteChannelConfiguration.newBuilder(tableId) + .setCreateDisposition(JobInfo.CreateDisposition.valueOf(context.getProperty(CREATE_DISPOSITION).getValue())) + .setWriteDisposition(JobInfo.WriteDisposition.valueOf(context.getProperty(WRITE_DISPOSITION).getValue())) + .setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).asBoolean()) + .setMaxBadRecords(context.getProperty(MAXBAD_RECORDS).asInteger()) + .setSchema(schema) + .setFormatOptions(FormatOptions.of(fileType)) + .build(); + + TableDataWriteChannel writer = bq.writer(writeChannelConfiguration); --- End diff -- This should also be in the try/catch as it can raise exception which you should cause the processor to yield. Example: ```` com.google.cloud.bigquery.BigQueryException: 404 Not Found Not Found com.google.cloud.bigquery.BigQueryException: 404 Not Found Not Found at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:99) at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.open(HttpBigQueryRpc.java:419) at com.google.cloud.bigquery.TableDataWriteChannel$2.call(TableDataWriteChannel.java:82) at com.google.cloud.bigquery.TableDataWriteChannel$2.call(TableDataWriteChannel.java:77) at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:89) at com.google.cloud.RetryHelper.run(RetryHelper.java:74) at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:51) at com.google.cloud.bigquery.TableDataWriteChannel.open(TableDataWriteChannel.java:76) at com.google.cloud.bigquery.TableDataWriteChannel.<init>(TableDataWriteChannel.java:41) at com.google.cloud.bigquery.BigQueryImpl.writer(BigQueryImpl.java:729) at com.google.cloud.bigquery.BigQueryImpl.writer(BigQueryImpl.java:720) at org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch.onTrigger(PutBigQueryBatch.java:211) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.http.HttpResponseException: 404 Not Found Not Found at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1070) at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.open(HttpBigQueryRpc.java:416) ... 21 common frames omitted ```` ```` > BigQuery processors > ------------------- > > Key: NIFI-4731 > URL: https://issues.apache.org/jira/browse/NIFI-4731 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Mikhail Sosonkin > Priority: Major > > NIFI should have processors for putting data into BigQuery (Streaming and > Batch). > Initial working processors can be found this repository: > https://github.com/nologic/nifi/tree/NIFI-4731/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery > I'd like to get them into Nifi proper. -- This message was sent by Atlassian JIRA (v7.6.3#76005)