[ 
https://issues.apache.org/jira/browse/DRILL-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651130#comment-17651130
 ] 

ASF GitHub Bot commented on DRILL-8371:
---------------------------------------

jnturton commented on code in PR #2722:
URL: https://github.com/apache/drill/pull/2722#discussion_r1055192926


##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) {
   @Override
   public void startRecord() {
     logger.debug("Starting record");
-    // Ensure that the new record is empty. This is not strictly necessary, 
but it is a belt and suspenders approach.
-    splunkEvent.clear();
+    // Ensure that the new record is empty.
+    splunkEvent = new JSONObject();
   }
 
   @Override
-  public void endRecord() throws IOException {
+  public void endRecord() {
     logger.debug("Ending record");
+    recordCount++;
+
+    // Put event in buffer
+    eventBuffer.add(splunkEvent);
+
     // Write the event to the Splunk index
-    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
-    // Clear out the splunk event.
-    splunkEvent.clear();
+    if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+      try {
+        writeEvents();
+      } catch (IOException e) {
+        throw  UserException.dataWriteError(e)
+            .message("Error writing data to Splunk: " + e.getMessage())
+            .build(logger);
+      }
+
+      // Reset record count
+      recordCount = 0;
+    }
   }
 
+
+  /*
+  args – Optional arguments for this stream. Valid parameters are: "host", 
"host_regex", "source", and "sourcetype".
+   */
   @Override
   public void abort() {
+    logger.debug("Aborting writing records to Splunk.");
     // No op
   }
 
   @Override
   public void cleanup() {
-    // No op
+    try {
+      writeEvents();
+    } catch (IOException e) {
+      throw  UserException.dataWriteError(e)
+          .message("Error writing data to Splunk: " + e.getMessage())
+          .build(logger);
+    }
+  }
+
+  private void writeEvents() throws IOException {
+    // Open the socket and stream, set up a timestamp
+    destinationIndex.attachWith(new ReceiverBehavior() {

Review Comment:
   This results in a dedicated TCP socket being opened and closed for every 
writer batch.



##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) {
   @Override
   public void startRecord() {
     logger.debug("Starting record");
-    // Ensure that the new record is empty. This is not strictly necessary, 
but it is a belt and suspenders approach.
-    splunkEvent.clear();
+    // Ensure that the new record is empty.
+    splunkEvent = new JSONObject();
   }
 
   @Override
-  public void endRecord() throws IOException {
+  public void endRecord() {
     logger.debug("Ending record");
+    recordCount++;
+
+    // Put event in buffer
+    eventBuffer.add(splunkEvent);
+
     // Write the event to the Splunk index
-    destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
-    // Clear out the splunk event.
-    splunkEvent.clear();
+    if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+      try {
+        writeEvents();
+      } catch (IOException e) {
+        throw  UserException.dataWriteError(e)
+            .message("Error writing data to Splunk: " + e.getMessage())
+            .build(logger);
+      }
+
+      // Reset record count
+      recordCount = 0;
+    }
   }
 
+
+  /*
+  args – Optional arguments for this stream. Valid parameters are: "host", 
"host_regex", "source", and "sourcetype".
+   */
   @Override
   public void abort() {
+    logger.debug("Aborting writing records to Splunk.");

Review Comment:
   Would there be any use in clearing eventBuffer here?





> Add Write/Append Capability to Splunk Plugin
> --------------------------------------------
>
>                 Key: DRILL-8371
>                 URL: https://issues.apache.org/jira/browse/DRILL-8371
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Storage - Splunk
>    Affects Versions: 1.20.2
>            Reporter: Charles Givre
>            Assignee: Charles Givre
>            Priority: Major
>             Fix For: 2.0.0
>
>
> While Drill can currently read from Splunk indexes, it cannot write to them 
> or create them.  This proposed PR adds support for CTAS queries for Splunk as 
> well as INSERT and DROP TABLE. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to