[ 
https://issues.apache.org/jira/browse/DRILL-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17650079#comment-17650079
 ] 

ASF GitHub Bot commented on DRILL-8371:
---------------------------------------

cgivre commented on code in PR #2722:
URL: https://github.com/apache/drill/pull/2722#discussion_r1053981884


##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.splunk;
+
+
+import com.splunk.Args;
+import com.splunk.Index;
+import com.splunk.IndexCollection;
+import com.splunk.Service;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class SplunkBatchWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SplunkBatchWriter.class);
+  private static final String DEFAULT_SOURCETYPE = "drill";
+  private final UserCredentials userCredentials;
+  private final List<String> tableIdentifier;
+  private final SplunkWriter config;
+  private final Args eventArgs;
+  protected final Service splunkService;
+  private final JSONObject splunkEvent;
+  protected Index destinationIndex;
+
+
+  public SplunkBatchWriter(UserCredentials userCredentials, List<String> 
tableIdentifier, SplunkWriter config) {
+    this.config = config;
+    this.tableIdentifier = tableIdentifier;
+    this.userCredentials = userCredentials;
+    this.splunkEvent = new JSONObject();
+    SplunkConnection connection = new 
SplunkConnection(config.getPluginConfig(), userCredentials.getUserName());
+    this.splunkService = connection.connect();
+
+    // Populate event arguments
+    this.eventArgs = new Args();
+    eventArgs.put("sourcetype", DEFAULT_SOURCETYPE);
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) throws IOException {
+    // No op
+  }
+
+  /**
+   * Update the schema in RecordWriter. Called before starting writing the 
records. In this case,
+   * we add the index to Splunk here. Splunk's API is a little sparse and 
doesn't really do much in the way
+   * of error checking or providing feedback if the operation fails.
+   *
+   * @param batch {@link VectorAccessible} The incoming batch
+   */
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    logger.debug("Updating schema for Splunk");
+
+    //Get the collection of indexes
+    IndexCollection indexes = splunkService.getIndexes();
+    try {
+      String indexName = tableIdentifier.get(0);
+      indexes.create(indexName);
+      destinationIndex = splunkService.getIndexes().get(indexName);
+    } catch (Exception e) {
+      // We have to catch a generic exception here, as Splunk's SDK does not 
really provide any kind of
+      // failure messaging.
+      throw UserException.systemError(e)
+        .message("Error creating new index in Splunk plugin: " + 
e.getMessage())
+        .build(logger);
+    }
+  }
+
+
+  @Override
+  public void startRecord() {
+    logger.debug("Starting record");
+    // Ensure that the new record is empty. This is not strictly necessary, 
but it is a belt and suspenders approach.
+    splunkEvent.clear();

Review Comment:
   I removed this from the `endRecord` method.





> Add Write/Append Capability to Splunk Plugin
> --------------------------------------------
>
>                 Key: DRILL-8371
>                 URL: https://issues.apache.org/jira/browse/DRILL-8371
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Storage - Splunk
>    Affects Versions: 1.20.2
>            Reporter: Charles Givre
>            Assignee: Charles Givre
>            Priority: Major
>             Fix For: 2.0.0
>
>
> While Drill can currently read from Splunk indexes, it cannot write to them 
> or create them.  This proposed PR adds support for CTAS queries for Splunk as 
> well as INSERT and DROP TABLE. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to