IMPALA-6019: Remove dead code parallel-executor* Change-Id: I7f0b121c2c40849937afa103dfedf0e0bef34477 Reviewed-on: http://gerrit.cloudera.org:8080/8206 Reviewed-by: Dan Hecht <[email protected]> Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ec957456 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ec957456 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ec957456 Branch: refs/heads/master Commit: ec957456d26cf0935654e8a70dbc1f36e8c1adae Parents: d40047a Author: Dan Hecht <[email protected]> Authored: Wed Oct 4 10:28:44 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Oct 6 03:02:05 2017 +0000 ---------------------------------------------------------------------- be/src/runtime/CMakeLists.txt | 2 - be/src/runtime/parallel-executor-test.cc | 82 --------------------------- be/src/runtime/parallel-executor.cc | 66 --------------------- be/src/runtime/parallel-executor.h | 67 ---------------------- 4 files changed, 217 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 9c655ec..faed531 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -49,7 +49,6 @@ add_library(Runtime mem-tracker.cc mem-pool.cc multi-precision.cc - parallel-executor.cc query-exec-mgr.cc query-state.cc test-env.cc @@ -83,7 +82,6 @@ ADD_BE_TEST(string-buffer-test) ADD_BE_TEST(data-stream-test) ADD_BE_TEST(timestamp-test) ADD_BE_TEST(disk-io-mgr-test) -ADD_BE_TEST(parallel-executor-test) ADD_BE_TEST(raw-value-test) ADD_BE_TEST(string-compare-test) ADD_BE_TEST(string-search-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/parallel-executor-test.cc b/be/src/runtime/parallel-executor-test.cc deleted file mode 100644 index 8347939..0000000 --- a/be/src/runtime/parallel-executor-test.cc +++ /dev/null @@ -1,82 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#include <string> -#include <boost/bind.hpp> - -#include "runtime/parallel-executor.h" -#include "testutil/gtest-util.h" -#include "util/thread.h" - -#include "common/names.h" - -using namespace impala; - -namespace impala { - -class ParallelExecutorTest { - public: - Status UpdateFunction(void* value) { - long arg = reinterpret_cast<long>(value); - EXPECT_FALSE(updates_found_[arg]); - updates_found_[arg] = true; - - double result = 0; - // Run something random to keep this cpu a little busy - for (int i = 0; i < 10000; ++i) { - for (int j = 0; j < 200; ++j) { - result += sin(i) + cos(j); - } - } - - return Status::OK(); - } - - ParallelExecutorTest(int num_updates) { - updates_found_.resize(num_updates); - } - - void Validate() { - for (int i = 0; i < updates_found_.size(); ++i) { - EXPECT_TRUE(updates_found_[i]); - } - } - - private: - vector<int> updates_found_; -}; - -TEST(ParallelExecutorTest, Basic) { - int num_work_items = 100; - ParallelExecutorTest test_caller(num_work_items); - - vector<long> args; - for (int i = 0; i < num_work_items; ++i) { - args.push_back(i); - } - - EXPECT_OK(ParallelExecutor::Exec( - bind<Status>(mem_fn(&ParallelExecutorTest::UpdateFunction), &test_caller, _1), - reinterpret_cast<void**>(args.data()), args.size())); - - test_caller.Validate(); -} - -} - -IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc deleted file mode 100644 index f3dd708..0000000 --- a/be/src/runtime/parallel-executor.cc +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/parallel-executor.h" - -#include <boost/thread/thread.hpp> - -#include "util/stopwatch.h" -#include "util/thread.h" - -#include "common/names.h" - -using namespace impala; - -Status ParallelExecutor::Exec(Function function, void** args, int num_args, - StatsMetric<double>* latencies) { - Status status; - ThreadGroup worker_threads; - mutex lock; - - for (int i = 0; i < num_args; ++i) { - stringstream ss; - ss << "worker-thread(" << i << ")"; - std::unique_ptr<Thread> t; - Status thread_status = Thread::Create("parallel-executor", ss.str(), - &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies, &t); - if (!thread_status.ok()) { - unique_lock<mutex> l(lock); - status = thread_status; - break; - } - worker_threads.AddThread(move(t)); - } - worker_threads.JoinAll(); - - return status; -} - -void ParallelExecutor::Worker(Function function, void* arg, mutex* lock, Status* status, - StatsMetric<double>* latencies) { - MonotonicStopWatch sw; - if (latencies != NULL) sw.Start(); - Status local_status = function(arg); - if (!local_status.ok()) { - unique_lock<mutex> l(*lock); - if (status->ok()) *status = local_status; - } - - if (latencies != NULL) { - latencies->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/parallel-executor.h b/be/src/runtime/parallel-executor.h deleted file mode 100644 index ceb4c66..0000000 --- a/be/src/runtime/parallel-executor.h +++ /dev/null @@ -1,67 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_RUNTIME_PARALLEL_EXECUTOR_H -#define IMPALA_RUNTIME_PARALLEL_EXECUTOR_H - -#include <boost/function.hpp> -#include <boost/thread/mutex.hpp> - -#include "common/status.h" -#include "util/collection-metrics.h" - -namespace impala { - -/// This is a class that executes multiple functions in parallel with different arguments -/// using a thread pool. -/// TODO: look into an API for this. Boost has one that is in review but not yet official. -/// TODO: use a shared pool? Thread creation is pretty cheap so this might not be -/// worth it -/// TODO: Consider rewriting in terms of ThreadPool -class ParallelExecutor { - public: - /// Typedef for the underlying function for the work. - /// The function must be thread safe. - /// The function must return a Status indicating if it was successful or not. - /// An example of how this function should be defined would be: - /// static Status Foo::IssueRpc(void* arg); - /// TODO: there might some magical template way to do this with boost that is more - /// type safe. - typedef boost::function<Status (void* arg)> Function; - - /// Calls function(args[i]) num_args times in parallel using num_args threads. - /// If any of the work item fails, returns the Status of the first failed work item. - /// Otherwise, returns Status::OK when all work items have been executed. - // - /// Callers may pass a StatsMetric to gather the latency distribution of task execution. - static Status Exec(Function function, void** args, int num_args, - StatsMetric<double>* latencies = NULL); - - private: - /// Worker thread function which calls function(arg). This function updates - /// *status taking *lock to synchronize results from different threads. - // - /// If 'latencies' is not NULL, it is updated with the time elapsed while executing - /// 'function'. - static void Worker(Function function, void* arg, boost::mutex* lock, Status* status, - StatsMetric<double>* latencies); -}; - -} - -#endif
