[
https://issues.apache.org/jira/browse/DRILL-8371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651130#comment-17651130
]
ASF GitHub Bot commented on DRILL-8371:
---------------------------------------
jnturton commented on code in PR #2722:
URL: https://github.com/apache/drill/pull/2722#discussion_r1055192926
##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) {
@Override
public void startRecord() {
logger.debug("Starting record");
- // Ensure that the new record is empty. This is not strictly necessary,
but it is a belt and suspenders approach.
- splunkEvent.clear();
+ // Ensure that the new record is empty.
+ splunkEvent = new JSONObject();
}
@Override
- public void endRecord() throws IOException {
+ public void endRecord() {
logger.debug("Ending record");
+ recordCount++;
+
+ // Put event in buffer
+ eventBuffer.add(splunkEvent);
+
// Write the event to the Splunk index
- destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
- // Clear out the splunk event.
- splunkEvent.clear();
+ if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+ try {
+ writeEvents();
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .message("Error writing data to Splunk: " + e.getMessage())
+ .build(logger);
+ }
+
+ // Reset record count
+ recordCount = 0;
+ }
}
+
+ /*
+ args – Optional arguments for this stream. Valid parameters are: "host",
"host_regex", "source", and "sourcetype".
+ */
@Override
public void abort() {
+ logger.debug("Aborting writing records to Splunk.");
// No op
}
@Override
public void cleanup() {
- // No op
+ try {
+ writeEvents();
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .message("Error writing data to Splunk: " + e.getMessage())
+ .build(logger);
+ }
+ }
+
+ private void writeEvents() throws IOException {
+ // Open the socket and stream, set up a timestamp
+ destinationIndex.attachWith(new ReceiverBehavior() {
Review Comment:
This results in a dedicated TCP socket being opened and closed for every
writer batch.
##########
contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchWriter.java:
##########
@@ -98,27 +100,69 @@ public void updateSchema(VectorAccessible batch) {
@Override
public void startRecord() {
logger.debug("Starting record");
- // Ensure that the new record is empty. This is not strictly necessary,
but it is a belt and suspenders approach.
- splunkEvent.clear();
+ // Ensure that the new record is empty.
+ splunkEvent = new JSONObject();
}
@Override
- public void endRecord() throws IOException {
+ public void endRecord() {
logger.debug("Ending record");
+ recordCount++;
+
+ // Put event in buffer
+ eventBuffer.add(splunkEvent);
+
// Write the event to the Splunk index
- destinationIndex.submit(eventArgs, splunkEvent.toJSONString());
- // Clear out the splunk event.
- splunkEvent.clear();
+ if (recordCount >= config.getPluginConfig().getWriterBatchSize()) {
+ try {
+ writeEvents();
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .message("Error writing data to Splunk: " + e.getMessage())
+ .build(logger);
+ }
+
+ // Reset record count
+ recordCount = 0;
+ }
}
+
+ /*
+ args – Optional arguments for this stream. Valid parameters are: "host",
"host_regex", "source", and "sourcetype".
+ */
@Override
public void abort() {
+ logger.debug("Aborting writing records to Splunk.");
Review Comment:
Would there be any use in clearing eventBuffer here?
> Add Write/Append Capability to Splunk Plugin
> --------------------------------------------
>
> Key: DRILL-8371
> URL: https://issues.apache.org/jira/browse/DRILL-8371
> Project: Apache Drill
> Issue Type: Improvement
> Components: Storage - Splunk
> Affects Versions: 1.20.2
> Reporter: Charles Givre
> Assignee: Charles Givre
> Priority: Major
> Fix For: 2.0.0
>
>
> While Drill can currently read from Splunk indexes, it cannot write to them
> or create them. This proposed PR adds support for CTAS queries for Splunk as
> well as INSERT and DROP TABLE.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)