sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r218202111
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##########
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
 
 Review comment:
   There are few race condition because of the way this class is implemented. 
Mainly the `RuntimeFilterSink` can be accessed in context of Netty thread and 
FragmentExecutor thread. Netty's thread will just add each received 
`RuntimeFilterWritable` into the queue and be done with it. 
   The race condition mainly appears w.r.t `AsyncAggregateWorker` thread and 
`FragmentExecutor` thread where async thread might be updating the shared 
`aggregated` instance and fragment executor thread will be using the same 
instance thinking it's the older filter (specifically underlying bloomfilter 
DrillBuff). Also during `close()` there can be issues like async thread might 
have just received another runtimeFilter and `close()` will then update the 
running state and close `aggregated` instance and thinks queue is empty. 
Whereas async thread can then try to `aggregate` the received runtimeFilter.
   
   Please define a clean contract for this class. Few things to consider:
   
   - Async aggregated thread can be started during creation of RuntimeFilterSink
   - Consider using `BlockingQueue` since async thread should block until next 
item becomes available rather than just spinning based on a state.
   - access to shared resource `RuntimeFilterWritable aggregated` needs to be 
protected by a lock.
   - async thread to check for `running` state before aggregating and after 
retrieving an element from the queue. In case of running state set to false 
should `clear` the polled element.
   - This class should just return bloom filter list and fieldList rather than 
entire aggregated `RuntimeFilterWritable` since that can be modified by caller 
as it exposes setter methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to