http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh
deleted file mode 100755
index 4b876be..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print 
$2}' | xargs -n 1 kill -9

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh
deleted file mode 100755
index eb1c01e..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix stop -n asterix;
-$MANAGIX_HOME/bin/managix delete -n asterix;
-

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh
deleted file mode 100755
index 2fb80ce..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix stop -n asterix;
-$MANAGIX_HOME/bin/managix start -n asterix;
-

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh
deleted file mode 100755
index 789914b..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix create -n asterix -c 
$MANAGIX_HOME/clusters/local/local.xml;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh
deleted file mode 100755
index 4b876be..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print 
$2}' | xargs -n 1 kill -9

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh
deleted file mode 100755
index eb1c01e..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix stop -n asterix;
-$MANAGIX_HOME/bin/managix delete -n asterix;
-

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh
deleted file mode 100755
index 2fb80ce..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix stop -n asterix;
-$MANAGIX_HOME/bin/managix start -n asterix;
-

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh
deleted file mode 100755
index e30be0c..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix configure;
-$MANAGIX_HOME/bin/managix validate;
-$MANAGIX_HOME/bin/managix validate -c $MANAGIX_HOME/clusters/local/local.xml;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh
deleted file mode 100755
index 4b876be..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print 
$2}' | xargs -n 1 kill -9

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh
deleted file mode 100755
index e46b4e5..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix shutdown;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh
 
b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh
deleted file mode 100755
index eb1c01e..0000000
--- 
a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-$MANAGIX_HOME/bin/managix stop -n asterix;
-$MANAGIX_HOME/bin/managix delete -n asterix;
-

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml 
b/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml
deleted file mode 100644
index 38179b2..0000000
--- a/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml
+++ /dev/null
@@ -1,219 +0,0 @@
-<!--
- ! Licensed to the Apache Software Foundation (ASF) under one
- ! or more contributor license agreements.  See the NOTICE file
- ! distributed with this work for additional information
- ! regarding copyright ownership.  The ASF licenses this file
- ! to you under the Apache License, Version 2.0 (the
- ! "License"); you may not use this file except in compliance
- ! with the License.  You may obtain a copy of the License at
- !
- !   http://www.apache.org/licenses/LICENSE-2.0
- !
- ! Unless required by applicable law or agreed to in writing,
- ! software distributed under the License is distributed on an
- ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ! KIND, either express or implied.  See the License for the
- ! specific language governing permissions and limitations
- ! under the License.
- !-->
-<test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
-  <test-group name="query_after_restart">
-    <test-case FilePath="query_after_restart">
-      <compilation-unit name="dataset-with-meta-record">
-        <output-dir compare="Text">dataset-with-meta-record</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="query_after_restart">
-      <compilation-unit name="external_index">
-        <output-dir compare="Text">external_index</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="query_after_restart">
-      <compilation-unit name="big_object_20M">
-        <output-dir compare="Text">big_object_20M</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
-  <test-group name="dml_after_restart">
-    <test-case FilePath="dml_after_restart">
-      <compilation-unit name="multiple_secondary_indices">
-        <output-dir compare="Text">multiple_secondary_indices</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
-  <test-group name="recover_after_abort">
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_index_only">
-        <output-dir compare="Text">primary_index_only</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_default_secondary_index">
-        <output-dir 
compare="Text">primary_plus_default_secondary_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_default_correlated_secondary_index">
-        <output-dir 
compare="Text">primary_plus_default_secondary_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_rtree_index">
-        <output-dir compare="Text">primary_plus_rtree_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_correlated_rtree_index">
-        <output-dir compare="Text">primary_plus_rtree_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_rtree_index_insert_and_delete">
-        <output-dir 
compare="Text">primary_plus_rtree_index_insert_and_delete</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_keyword_secondary_index">
-        <output-dir 
compare="Text">primary_plus_keyword_secondary_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_keyword_correlated_secondary_index">
-        <output-dir 
compare="Text">primary_plus_keyword_secondary_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_ngram_index">
-        <output-dir compare="Text">primary_plus_ngram_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_correlated_ngram_index">
-        <output-dir compare="Text">primary_plus_ngram_index</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_multiple_secondary_indices">
-        <output-dir 
compare="Text">primary_plus_multiple_secondary_indices</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit 
name="primary_plus_multiple_correlated_secondary_indices">
-        <output-dir 
compare="Text">primary_plus_multiple_secondary_indices</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_index_only_filtered">
-        <output-dir compare="Text">primary_index_only_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_default_secondary_index_filtered">
-        <output-dir 
compare="Text">primary_plus_default_secondary_index_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_rtree_index_filtered">
-        <output-dir 
compare="Text">primary_plus_rtree_index_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit 
name="primary_plus_rtree_index_insert_and_delete_filtered">
-        <output-dir 
compare="Text">primary_plus_rtree_index_insert_and_delete_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_keyword_secondary_index_filtered">
-        <output-dir 
compare="Text">primary_plus_keyword_secondary_index_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_ngram_index">
-        <output-dir 
compare="Text">primary_plus_ngram_index_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_multiple_secondary_indices">
-        <output-dir 
compare="Text">primary_plus_multiple_secondary_indices_filtered</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recover_after_abort">
-      <compilation-unit name="primary_plus_multiple_secondary_indices"><!-- 
The only exception here is during the kill command which is in a different JVM, 
hence not caught -->
-        <output-dir 
compare="Text">primary_plus_multiple_secondary_indices</output-dir>
-        <!-- 
<expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error>
 -->
-      </compilation-unit>
-    </test-case>
-  </test-group>
-
-  <test-group name="recovery_ddl">
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="dataverse_recovery">
-        <output-dir compare="Text">dataverse_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="datatype_recovery">
-        <output-dir compare="Text">datatype_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="dataset_recovery">
-        <output-dir compare="Text">dataset_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="secondary_index_recovery">
-        <output-dir compare="Text">secondary_index_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="load_after_recovery">
-        <output-dir compare="Text">load_after_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="insert_after_recovery">
-        <output-dir compare="Text">insert_after_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="delete_after_recovery">
-        <output-dir compare="Text">delete_after_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-
-    <test-case FilePath="recovery_ddl">
-      <compilation-unit name="function_recovery">
-        <output-dir compare="Text">function_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
-
-</test-suite>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/pom.xml 
b/asterixdb/asterix-metadata/pom.xml
index 9784c5e..606b474 100644
--- a/asterixdb/asterix-metadata/pom.xml
+++ b/asterixdb/asterix-metadata/pom.xml
@@ -56,12 +56,6 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.asterix</groupId>
-      <artifactId>asterix-events</artifactId>
-      <version>${project.version}</version>
-      <scope>compile</scope>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hyracks</groupId>
       <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
     </dependency>
@@ -118,10 +112,6 @@
       <artifactId>algebricks-core</artifactId>
     </dependency>
     <dependency>
-      <groupId>javax.xml.bind</groupId>
-      <artifactId>jaxb-api</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
@@ -190,4 +180,4 @@
       <artifactId>hadoop-common</artifactId>
     </dependency>
   </dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java
deleted file mode 100644
index 8459f70..0000000
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.api;
-
-import java.util.Set;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.event.schema.cluster.Node;
-
-public interface IClusterManager {
-
-    /**
-     * @param node
-     * @throws Exception
-     */
-    public void addNode(ICcApplicationContext appCtx, Node node) throws 
Exception;
-
-    /**
-     * @param node
-     * @throws Exception
-     */
-    public void removeNode(Node node) throws Exception;
-
-    /**
-     * @param subscriber
-     */
-    public void registerSubscriber(IClusterEventsSubscriber subscriber);
-
-    /**
-     * @param sunscriber
-     * @return
-     */
-    public boolean deregisterSubscriber(IClusterEventsSubscriber sunscriber);
-
-    /**
-     * @return
-     */
-    public Set<IClusterEventsSubscriber> 
getRegisteredClusterEventSubscribers();
-
-    void notifyStartupCompleted() throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 0303392..bf8079e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -65,6 +65,7 @@ import org.apache.asterix.metadata.utils.MetadataConstants;
 import org.apache.asterix.metadata.utils.MetadataUtil;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
 import 
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
@@ -108,14 +109,14 @@ public class MetadataBootstrap {
     private static String metadataNodeName;
     private static List<String> nodeNames;
     private static boolean isNewUniverse;
-    private static final IMetadataIndex[] PRIMARY_INDEXES =
-            new IMetadataIndex[] { MetadataPrimaryIndexes.DATAVERSE_DATASET, 
MetadataPrimaryIndexes.DATASET_DATASET,
-                    MetadataPrimaryIndexes.DATATYPE_DATASET, 
MetadataPrimaryIndexes.INDEX_DATASET,
-                    MetadataPrimaryIndexes.NODE_DATASET, 
MetadataPrimaryIndexes.NODEGROUP_DATASET,
-                    MetadataPrimaryIndexes.FUNCTION_DATASET, 
MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
-                    MetadataPrimaryIndexes.FEED_DATASET, 
MetadataPrimaryIndexes.FEED_POLICY_DATASET,
-                    MetadataPrimaryIndexes.LIBRARY_DATASET, 
MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
-                    MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, 
MetadataPrimaryIndexes.FEED_CONNECTION_DATASET };
+    private static final IMetadataIndex[] PRIMARY_INDEXES = new 
IMetadataIndex[] {
+            MetadataPrimaryIndexes.DATAVERSE_DATASET, 
MetadataPrimaryIndexes.DATASET_DATASET,
+            MetadataPrimaryIndexes.DATATYPE_DATASET, 
MetadataPrimaryIndexes.INDEX_DATASET,
+            MetadataPrimaryIndexes.NODE_DATASET, 
MetadataPrimaryIndexes.NODEGROUP_DATASET,
+            MetadataPrimaryIndexes.FUNCTION_DATASET, 
MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET,
+            MetadataPrimaryIndexes.FEED_DATASET, 
MetadataPrimaryIndexes.FEED_POLICY_DATASET,
+            MetadataPrimaryIndexes.LIBRARY_DATASET, 
MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET,
+            MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, 
MetadataPrimaryIndexes.FEED_CONNECTION_DATASET };
 
     private MetadataBootstrap() {
     }
@@ -265,9 +266,9 @@ public class MetadataBootstrap {
 
     private static void 
insertInitialCompactionPolicies(MetadataTransactionContext mdTxnCtx)
             throws AlgebricksException {
-        String[] builtInCompactionPolicyClassNames =
-                new String[] { ConstantMergePolicyFactory.class.getName(), 
PrefixMergePolicyFactory.class.getName(),
-                        NoMergePolicyFactory.class.getName(), 
CorrelatedPrefixMergePolicyFactory.class.getName() };
+        String[] builtInCompactionPolicyClassNames = new String[] { 
ConstantMergePolicyFactory.class.getName(),
+                PrefixMergePolicyFactory.class.getName(), 
NoMergePolicyFactory.class.getName(),
+                CorrelatedPrefixMergePolicyFactory.class.getName() };
         for (String policyClassName : builtInCompactionPolicyClassNames) {
             CompactionPolicy compactionPolicy = 
getCompactionPolicyEntity(policyClassName);
             MetadataManager.INSTANCE.addCompactionPolicy(mdTxnCtx, 
compactionPolicy);
@@ -287,8 +288,8 @@ public class MetadataBootstrap {
     private static CompactionPolicy getCompactionPolicyEntity(String 
compactionPolicyClassName)
             throws AlgebricksException {
         try {
-            String policyName =
-                    ((ILSMMergePolicyFactory) 
(Class.forName(compactionPolicyClassName).newInstance())).getName();
+            String policyName = ((ILSMMergePolicyFactory) 
(Class.forName(compactionPolicyClassName).newInstance()))
+                    .getName();
             return new 
CompactionPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, policyName,
                     compactionPolicyClassName);
         } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
@@ -325,13 +326,13 @@ public class MetadataBootstrap {
         // We are unable to do this since IStorageManager needs a dataset to 
determine
         // the appropriate
         // objects
-        ILSMOperationTrackerFactory opTrackerFactory =
-                index.isPrimaryIndex() ? new 
PrimaryIndexOperationTrackerFactory(datasetId)
-                        : new SecondaryIndexOperationTrackerFactory(datasetId);
-        ILSMComponentIdGeneratorFactory idGeneratorProvider =
-                new 
DatasetLSMComponentIdGeneratorFactory(index.getDatasetId().getId());
-        ILSMIOOperationCallbackFactory ioOpCallbackFactory =
-                new LSMBTreeIOOperationCallbackFactory(idGeneratorProvider);
+        ILSMOperationTrackerFactory opTrackerFactory = index.isPrimaryIndex()
+                ? new PrimaryIndexOperationTrackerFactory(datasetId)
+                : new SecondaryIndexOperationTrackerFactory(datasetId);
+        ILSMComponentIdGeneratorFactory idGeneratorProvider = new 
DatasetLSMComponentIdGeneratorFactory(
+                index.getDatasetId().getId());
+        ILSMIOOperationCallbackFactory ioOpCallbackFactory = new 
LSMBTreeIOOperationCallbackFactory(
+                idGeneratorProvider);
         IStorageComponentProvider storageComponentProvider = 
appContext.getStorageComponentProvider();
         if (isNewUniverse()) {
             LSMBTreeLocalResourceFactory lsmBtreeFactory = new 
LSMBTreeLocalResourceFactory(
@@ -341,8 +342,8 @@ public class MetadataBootstrap {
                     storageComponentProvider.getIoOperationSchedulerProvider(),
                     appContext.getMetadataMergePolicyFactory(), 
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
                     bloomFilterKeyFields, 
appContext.getBloomFilterFalsePositiveRate(), true, null);
-            DatasetLocalResourceFactory dsLocalResourceFactory =
-                    new DatasetLocalResourceFactory(datasetId, 
lsmBtreeFactory);
+            DatasetLocalResourceFactory dsLocalResourceFactory = new 
DatasetLocalResourceFactory(datasetId,
+                    lsmBtreeFactory);
             // TODO(amoudi) Creating the index should be done through the same 
code path as
             // other indexes
             // This is to be done by having a metadata dataset associated with 
each index
@@ -362,8 +363,8 @@ public class MetadataBootstrap {
             if (index.getResourceId() != resource.getId()) {
                 throw new HyracksDataException("Resource Id doesn't match 
expected metadata index resource id");
             }
-            IndexDataflowHelper indexHelper =
-                    new IndexDataflowHelper(ncServiceCtx, 
storageComponentProvider.getStorageManager(), file);
+            IndexDataflowHelper indexHelper = new 
IndexDataflowHelper(ncServiceCtx,
+                    storageComponentProvider.getStorageManager(), file);
             indexHelper.open(); // Opening the index through the helper will 
ensure it gets instantiated
             indexHelper.close();
         }
@@ -418,8 +419,8 @@ public class MetadataBootstrap {
                 LOGGER.info("Dropped a pending dataverse: " + 
dataverse.getDataverseName());
             }
         } else {
-            List<Dataset> datasets =
-                    MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, 
dataverse.getDataverseName());
+            List<Dataset> datasets = 
MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx,
+                    dataverse.getDataverseName());
             for (Dataset dataset : datasets) {
                 recoverDataset(mdTxnCtx, dataset);
             }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
deleted file mode 100644
index c1c19a4..0000000
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.cluster;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.ExternalProperties;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.event.management.AsterixEventServiceClient;
-import org.apache.asterix.event.model.AsterixInstance;
-import org.apache.asterix.event.schema.cluster.Cluster;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.event.schema.pattern.Pattern;
-import org.apache.asterix.event.schema.pattern.Patterns;
-import org.apache.asterix.event.service.AsterixEventService;
-import org.apache.asterix.event.service.AsterixEventServiceUtil;
-import org.apache.asterix.event.service.ILookupService;
-import org.apache.asterix.event.service.ServiceProvider;
-import org.apache.asterix.event.util.PatternCreator;
-import org.apache.asterix.installer.schema.conf.Configuration;
-import org.apache.asterix.metadata.api.IClusterManager;
-
-public class ClusterManager implements IClusterManager {
-
-    private static final Logger LOGGER = 
Logger.getLogger(ClusterManager.class.getName());
-
-    public static final IClusterManager INSTANCE = 
ClusterManagerProvider.getClusterManager();
-
-    private final AsterixEventServiceClient client;
-
-    private final ILookupService lookupService;
-
-    private final Set<IClusterEventsSubscriber> eventSubscribers = new 
HashSet<>();
-
-    ClusterManager(String eventHome) {
-        String asterixDir = System.getProperty("user.dir") + File.separator + 
"asterix";
-        File configFile = new File(System.getProperty("user.dir") + 
File.separator + "configuration.xml");
-        Configuration configuration = null;
-
-        try {
-            JAXBContext configCtx = 
JAXBContext.newInstance(Configuration.class);
-            Unmarshaller unmarshaller = configCtx.createUnmarshaller();
-            configuration = (Configuration) unmarshaller.unmarshal(configFile);
-            AsterixEventService.initialize(configuration, asterixDir, 
eventHome);
-            client = 
AsterixEventService.getAsterixEventServiceClient(ClusterProperties.INSTANCE.getCluster());
-
-            lookupService = ServiceProvider.INSTANCE.getLookupService();
-            if (!lookupService.isRunning(configuration)) {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Lookup service not running. Starting lookup 
service ...");
-                }
-                lookupService.startService(configuration);
-            } else {
-                if (LOGGER.isLoggable(Level.INFO)) {
-                    LOGGER.info("Lookup service running");
-                }
-            }
-
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to initialize cluster 
manager" + e);
-        }
-    }
-
-    @Override
-    public void addNode(ICcApplicationContext appCtx, Node node) throws 
Exception {
-        Cluster cluster = ClusterProperties.INSTANCE.getCluster();
-        List<Pattern> pattern = new ArrayList<>();
-        String asterixInstanceName = 
appCtx.getMetadataProperties().getInstanceName();
-        Patterns prepareNode = 
PatternCreator.INSTANCE.createPrepareNodePattern(asterixInstanceName,
-                ClusterProperties.INSTANCE.getCluster(), node);
-        cluster.getNode().add(node);
-        client.submit(prepareNode);
-
-        ExternalProperties externalProps = appCtx.getExternalProperties();
-        AsterixEventServiceUtil.poulateClusterEnvironmentProperties(cluster, 
externalProps.getCCJavaParams(),
-                externalProps.getNCJavaParams());
-
-        pattern.clear();
-        String ccHost = cluster.getMasterNode().getClusterIp();
-        String hostId = node.getId();
-        String nodeControllerId = asterixInstanceName + "_" + node.getId();
-        String iodevices = node.getIodevices() == null ? 
cluster.getIodevices() : node.getIodevices();
-        Pattern startNC =
-                PatternCreator.INSTANCE.createNCStartPattern(ccHost, hostId, 
nodeControllerId, iodevices, false);
-        pattern.add(startNC);
-        Patterns startNCPattern = new Patterns(pattern);
-        client.submit(startNCPattern);
-
-        removeNode(cluster.getSubstituteNodes().getNode(), node);
-
-        AsterixInstance instance = 
lookupService.getAsterixInstance(cluster.getInstanceName());
-        instance.getCluster().getNode().add(node);
-        removeNode(instance.getCluster().getSubstituteNodes().getNode(), node);
-        lookupService.updateAsterixInstance(instance);
-    }
-
-    private void removeNode(List<Node> list, Node node) {
-        Node nodeToRemove = null;
-        for (Node n : list) {
-            if (n.getId().equals(node.getId())) {
-                nodeToRemove = n;
-                break;
-            }
-        }
-        if (nodeToRemove != null) {
-            boolean removed = list.remove(nodeToRemove);
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("attempt to remove node :" + nodeToRemove + " 
successful " + removed);
-            }
-        }
-    }
-
-    @Override
-    public void removeNode(Node node) {
-        // to be implemented later.
-    }
-
-    @Override
-    public void registerSubscriber(IClusterEventsSubscriber subscriber) {
-        eventSubscribers.add(subscriber);
-    }
-
-    @Override
-    public boolean deregisterSubscriber(IClusterEventsSubscriber subscriber) {
-        return eventSubscribers.remove(subscriber);
-    }
-
-    @Override
-    public Set<IClusterEventsSubscriber> 
getRegisteredClusterEventSubscribers() {
-        return eventSubscribers;
-    }
-
-    @Override
-    public void notifyStartupCompleted() throws Exception {
-        // Notify Zookeeper that the startup is complete
-        
lookupService.reportClusterState(ClusterProperties.INSTANCE.getCluster().getInstanceName(),
-                IClusterManagementWork.ClusterState.ACTIVE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java
deleted file mode 100644
index 2b7c9df..0000000
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.metadata.cluster;
-
-import java.util.Collections;
-import java.util.Set;
-
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.event.schema.cluster.Cluster;
-import org.apache.asterix.event.schema.cluster.Node;
-import org.apache.asterix.metadata.api.IClusterManager;
-
-public class ClusterManagerProvider {
-
-    private ClusterManagerProvider() {
-    }
-
-    public static IClusterManager getClusterManager() {
-        return Holder.INSTANCE;
-    }
-
-    private static final class Holder {
-        static final IClusterManager INSTANCE;
-
-        static {
-            Cluster asterixCluster = ClusterProperties.INSTANCE.getCluster();
-            String eventHome = asterixCluster == null ? null
-                    : asterixCluster.getWorkingDir() == null ? null : 
asterixCluster.getWorkingDir().getDir();
-
-            if (eventHome != null) {
-                INSTANCE = new ClusterManager(eventHome);
-            } else {
-                INSTANCE = new NoopClusterManager();
-            }
-        }
-
-        private Holder() {
-        }
-    }
-    private static class NoopClusterManager implements IClusterManager {
-        @Override
-        public void addNode(ICcApplicationContext appCtx, Node node) {
-            // no-op
-        }
-
-        @Override
-        public void removeNode(Node node) {
-            // no-op
-        }
-
-        @Override
-        public void registerSubscriber(IClusterEventsSubscriber subscriber) {
-            // no-op
-        }
-
-        @Override
-        public boolean deregisterSubscriber(IClusterEventsSubscriber 
sunscriber) {
-            return true;
-        }
-
-        @Override
-        public Set<IClusterEventsSubscriber> 
getRegisteredClusterEventSubscribers() {
-            return Collections.emptySet();
-        }
-
-        @Override
-        public void notifyStartupCompleted() {
-            // no-op
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 1e0d597..0398f1a 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -284,8 +284,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public Dataset findDataset(String dataverse, String dataset) throws 
AlgebricksException {
-        String dv =
-                dataverse == null ? (defaultDataverse == null ? null : 
defaultDataverse.getDataverseName()) : dataverse;
+        String dv = dataverse == null ? (defaultDataverse == null ? null : 
defaultDataverse.getDataverseName())
+                : dataverse;
         if (dv == null) {
             return null;
         }
@@ -409,8 +409,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                         policyAccessor, factoryOutput.second);
                 break;
             case EXTERNAL:
-                String libraryName =
-                        
feed.getAdapterName().trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+                String libraryName = feed.getAdapterName().trim()
+                        
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
                 feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, 
libraryName,
                         adapterFactory.getClass().getName(), recordType, 
policyAccessor, factoryOutput.second);
                 break;
@@ -433,19 +433,21 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         if (primaryIndex != null && (dataset.getDatasetType() != 
DatasetType.EXTERNAL)) {
             isSecondary = !indexName.equals(primaryIndex.getIndexName());
         }
-        Index theIndex = isSecondary ? 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
-                dataset.getDatasetName(), indexName) : primaryIndex;
+        Index theIndex = isSecondary
+                ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(), dataset.getDatasetName(),
+                        indexName)
+                : primaryIndex;
         int numPrimaryKeys = dataset.getPrimaryKeys().size();
         RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                getSplitProviderAndConstraints(dataset, 
theIndex.getIndexName());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = 
getSplitProviderAndConstraints(dataset,
+                theIndex.getIndexName());
         int[] primaryKeyFields = new int[numPrimaryKeys];
         for (int i = 0; i < numPrimaryKeys; i++) {
             primaryKeyFields[i] = i;
         }
 
-        ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                storageComponentProvider, theIndex, IndexOperation.SEARCH, 
primaryKeyFields);
+        ISearchOperationCallbackFactory searchCallbackFactory = dataset
+                .getSearchCallbackFactory(storageComponentProvider, theIndex, 
IndexOperation.SEARCH, primaryKeyFields);
         IStorageManager storageManager = 
getStorageComponentProvider().getStorageManager();
         IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(storageManager, spPc.first);
         BTreeSearchOperatorDescriptor btreeSearchOp;
@@ -476,8 +478,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     "Code generation error: no index " + indexName + " for 
dataset " + dataset.getDatasetName());
         }
         RecordDescriptor outputRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = 
getSplitProviderAndConstraints(dataset,
+                secondaryIndex.getIndexName());
         int[] primaryKeyFields = new int[numPrimaryKeys];
         for (int i = 0; i < numPrimaryKeys; i++) {
             primaryKeyFields[i] = i;
@@ -486,8 +488,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
                 storageComponentProvider, secondaryIndex, 
IndexOperation.SEARCH, primaryKeyFields);
         RTreeSearchOperatorDescriptor rtreeSearchOp;
-        IIndexDataflowHelperFactory indexDataflowHelperFactory =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
spPc.first);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), spPc.first);
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, keyFields, true, true,
                     indexDataflowHelperFactory, retainInput, retainMissing, 
context.getMissingWriterFactory(),
@@ -512,8 +514,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         File outFile = new File(fs.getPath());
         String nodeId = fs.getNodeName();
 
-        SinkWriterRuntimeFactory runtime =
-                new SinkWriterRuntimeFactory(printColumns, printerFactories, 
outFile, getWriterFactory(), inputDesc);
+        SinkWriterRuntimeFactory runtime = new 
SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
+                getWriterFactory(), inputDesc);
         AlgebricksPartitionConstraint apc = new 
AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
         return new Pair<>(runtime, apc);
     }
@@ -562,16 +564,16 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             fieldPermutation[numKeys + 1] = idx;
         }
 
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint = getSplitProviderAndConstraints(
+                dataset);
         long numElementsHint = getCardinalityPerPartitionHint(dataset);
         // TODO
         // figure out the right behavior of the bulkload and then give the
         // right callback
         // (ex. what's the expected behavior when there is an error during
         // bulkload?)
-        IIndexDataflowHelperFactory indexHelperFactory =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
+        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
LSMIndexBulkLoadOperatorDescriptor(spec, null,
                 fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, 
false, numElementsHint, true,
                 indexHelperFactory, null, BulkLoadUsage.LOAD, 
dataset.getDatasetId());
@@ -688,8 +690,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             numElementsHint = Long.parseLong(numElementsHintString);
         }
         int numPartitions = 0;
-        List<String> nodeGroup =
-                MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName()).getNodeNames();
+        List<String> nodeGroup = 
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName())
+                .getNodeNames();
         IClusterStateManager csm = appCtx.getClusterStateManager();
         for (String nd : nodeGroup) {
             numPartitions += csm.getNodePartitionsCount(nd);
@@ -705,9 +707,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     getApplicationContext().getServiceContext(), adapterName, 
configuration, itemType, metaType);
 
             // check to see if dataset is indexed
-            Index filesIndex =
-                    MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(), dataset.getDatasetName(),
-                            
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
+            Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
+                    dataset.getDatasetName(),
+                    
dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX));
 
             if (filesIndex != null && filesIndex.getPendingOp() == 0) {
                 // get files
@@ -743,7 +745,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
             throws AlgebricksException {
-        return 
SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), 
dataset, indexName, mdTxnCtx);
+        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, 
mdTxnCtx, appCtx.getClusterStateManager());
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String adapterName)
@@ -769,21 +771,22 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             MetadataProvider metadataProvider, boolean retainMissing) throws 
AlgebricksException {
         try {
             // Get data type
-            ARecordType itemType =
-                    (ARecordType) 
MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
-                            dataset.getDataverseName(), 
dataset.getItemTypeName()).getDatatype();
+            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
+                    .getDatatype(metadataProvider.getMetadataTxnContext(), 
dataset.getDataverseName(),
+                            dataset.getItemTypeName())
+                    .getDatatype();
             ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
             LookupAdapterFactory<?> adapterFactory = 
AdapterFactoryProvider.getLookupAdapterFactory(
                     getApplicationContext().getServiceContext(), 
datasetDetails.getProperties(), itemType, ridIndexes,
                     retainInput, retainMissing, 
context.getMissingWriterFactory());
             String fileIndexName = 
IndexingConstants.getFilesIndexName(dataset.getDatasetName());
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc =
-                    metadataProvider.getSplitProviderAndConstraints(dataset, 
fileIndexName);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = 
metadataProvider
+                    .getSplitProviderAndConstraints(dataset, fileIndexName);
             Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName);
             // Create the file index data flow helper
-            IIndexDataflowHelperFactory indexDataflowHelperFactory =
-                    new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
spPc.first);
+            IIndexDataflowHelperFactory indexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                    storageComponentProvider.getStorageManager(), spPc.first);
             // Create the out record descriptor, appContext and 
fileSplitProvider for the
             // files index
             RecordDescriptor outRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
@@ -849,12 +852,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             throw new AlgebricksException("Can only scan datasets of 
records.");
         }
 
-        ISerializerDeserializer<?> payloadSerde =
-                
getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+        ISerializerDeserializer<?> payloadSerde = 
getDataFormat().getSerdeProvider()
+                .getSerializerDeserializer(itemType);
         RecordDescriptor scannerDesc = new RecordDescriptor(new 
ISerializerDeserializer[] { payloadSerde });
 
-        ExternalScanOperatorDescriptor dataScanner =
-                new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, 
adapterFactory);
+        ExternalScanOperatorDescriptor dataScanner = new 
ExternalScanOperatorDescriptor(jobSpec, scannerDesc,
+                adapterFactory);
 
         AlgebricksPartitionConstraint constraint;
         try {
@@ -880,9 +883,9 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         int i = 0;
         for (; i < sidxKeyFieldCount; ++i) {
-            Pair<IAType, Boolean> keyPairType =
-                    
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), 
sidxKeyFieldNames.get(i),
-                            (hasMeta && 
secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType);
+            Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i),
+                    sidxKeyFieldNames.get(i),
+                    (hasMeta && secondaryIndexIndicators.get(i).intValue() == 
1) ? metaType : recType);
             IAType keyType = keyPairType.first;
             comparatorFactories[i] = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
             typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
@@ -920,8 +923,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             List<LogicalVariable> additionalNonFilteringFields) throws 
AlgebricksException {
 
         String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset =
-                MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataSource.getId().getDataverseName(), datasetName);
+        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataSource.getId().getDataverseName(),
+                datasetName);
         int numKeys = keys.size();
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
         // Move key fields to front.
@@ -949,18 +952,18 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
 
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                 dataset.getDatasetName(), dataset.getDatasetName());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint = getSplitProviderAndConstraints(
+                dataset);
 
         // prepare callback
         int[] primaryKeyFields = new int[numKeys];
         for (i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;
         }
-        IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, indexOp, 
primaryKeyFields);
-        IIndexDataflowHelperFactory idfh =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
+        IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset
+                .getModificationCallbackFactory(storageComponentProvider, 
primaryIndex, indexOp, primaryKeyFields);
+        IIndexDataflowHelperFactory idfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                splitsAndConstraint.first);
         IOperatorDescriptor op;
         if (bulkload) {
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1119,8 +1122,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 dataset.getDatasetName(), indexName);
         List<List<String>> secondaryKeyExprs = 
secondaryIndex.getKeyFieldNames();
         List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-        Pair<IAType, Boolean> keyPairType =
-                Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), 
secondaryKeyExprs.get(0), recType);
+        Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0),
+                secondaryKeyExprs.get(0), recType);
         IAType spatialType = keyPairType.first;
         int dimension = 
NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
         int numSecondaryKeys = dimension * 2;
@@ -1173,14 +1176,14 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                 prevFieldPermutation[numKeys] = idx;
             }
         }
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint = getSplitProviderAndConstraints(
+                dataset, secondaryIndex.getIndexName());
 
         // prepare callback
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
                 storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
-        IIndexDataflowHelperFactory indexDataflowHelperFactory =
-                new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
+        IIndexDataflowHelperFactory indexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
+                storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         IOperatorDescriptor op;
         if (bulkload) {
             long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1501,8 +1504,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             return null;
         }
         IExpressionRuntimeProvider expressionRuntimeProvider = 
context.getExpressionRuntimeProvider();
-        IScalarEvaluatorFactory filterEvalFactory =
-                expressionRuntimeProvider.createEvaluatorFactory(filterExpr, 
typeEnv, inputSchemas, context);
+        IScalarEvaluatorFactory filterEvalFactory = 
expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
+                typeEnv, inputSchemas, context);
         return new AsterixTupleFilterFactory(filterEvalFactory, 
context.getBinaryBooleanInspectorFactory());
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 7ac6183..2a6d0e8 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.exceptions.MetadataException;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -52,14 +53,18 @@ public class SplitsAndConstraintsUtil {
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileSplit[] getIndexSplits(IClusterStateManager 
clusterStateManager, Dataset dataset,
-            String indexName, MetadataTransactionContext mdTxnCtx) throws 
AlgebricksException {
-        NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName());
-        if (nodeGroup == null) {
-            throw new AlgebricksException("Couldn't find node group " + 
dataset.getNodeGroupName());
+    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, MetadataTransactionContext mdTxnCtx,
+            IClusterStateManager csm) throws AlgebricksException {
+        try {
+            NodeGroup nodeGroup = 
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName());
+            if (nodeGroup == null) {
+                throw new AlgebricksException("Couldn't find node group " + 
dataset.getNodeGroupName());
+            }
+            List<String> nodeList = nodeGroup.getNodeNames();
+            return getIndexSplits(csm, dataset, indexName, nodeList);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
         }
-        List<String> nodeList = nodeGroup.getNodeNames();
-        return getIndexSplits(clusterStateManager, dataset, indexName, 
nodeList);
     }
 
     public static FileSplit[] getIndexSplits(IClusterStateManager 
clusterStateManager, Dataset dataset,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-replication/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/pom.xml 
b/asterixdb/asterix-replication/pom.xml
index 2b5fe0c..41afc1d 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -72,6 +72,14 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-nc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-control-common</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index be7cdc5..e260de5 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -25,7 +25,6 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.concurrent.Callable;
 
-import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.replication.functions.ReplicationProtocol;
@@ -36,15 +35,13 @@ public class ReplicaStateChecker implements Callable<Void> {
     private final Replica replica;
     private final int replicationTimeOut;
     private final ReplicationManager replicationManager;
-    private final ReplicationProperties asterixReplicationProperties;
     private final boolean suspendReplication;
 
     public ReplicaStateChecker(Replica replica, int replicationTimeOut, 
ReplicationManager replicationManager,
-            ReplicationProperties asterixReplicationProperties, boolean 
suspendReplication) {
+            boolean suspendReplication) {
         this.replica = replica;
         this.replicationTimeOut = replicationTimeOut;
         this.replicationManager = replicationManager;
-        this.asterixReplicationProperties = asterixReplicationProperties;
         this.suspendReplication = suspendReplication;
     }
 
@@ -53,7 +50,8 @@ public class ReplicaStateChecker implements Callable<Void> {
         Thread.currentThread().setName("ReplicaConnector Thread");
 
         long startTime = System.currentTimeMillis();
-        InetSocketAddress replicaAddress = 
replica.getAddress(asterixReplicationProperties);
+        InetSocketAddress replicaAddress = 
InetSocketAddress.createUnresolved(replica.getClusterIp(),
+                replica.getPort());
 
         while (true) {
             try (SocketChannel connection = SocketChannel.open()) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index ba24e07..32e9498 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -28,6 +28,7 @@ import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.FileChannel;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -54,6 +55,7 @@ import 
org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.IReplicationThread;
+import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -82,6 +84,8 @@ import 
org.apache.asterix.replication.storage.ReplicaResourcesManager;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogBuffer;
 import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -116,12 +120,16 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
     private final Object flushLogslock = new Object();
     private final IDatasetLifecycleManager dsLifecycleManager;
     private final PersistentLocalResourceRepository localResourceRep;
+    private final IReplicationStrategy replicationStrategy;
+    private final NCConfig ncConfig;
+    private Set nodeHostedPartitions;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     private final INcApplicationContext appCtx;
 
     public ReplicationChannel(String nodeId, ReplicationProperties 
replicationProperties, ILogManager logManager,
             IReplicaResourcesManager replicaResoucesManager, 
IReplicationManager replicationManager,
-            INCServiceContext ncServiceContext, IAppRuntimeContextProvider 
asterixAppRuntimeContextProvider) {
+            INCServiceContext ncServiceContext, IAppRuntimeContextProvider 
asterixAppRuntimeContextProvider,
+            IReplicationStrategy replicationStrategy) {
         this.logManager = logManager;
         this.localNodeID = nodeId;
         this.replicaResourcesManager = (ReplicaResourcesManager) 
replicaResoucesManager;
@@ -129,8 +137,10 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         this.replicationProperties = replicationProperties;
         this.appContextProvider = asterixAppRuntimeContextProvider;
         this.dsLifecycleManager = 
asterixAppRuntimeContextProvider.getDatasetLifecycleManager();
-        this.localResourceRep =
-                (PersistentLocalResourceRepository) 
asterixAppRuntimeContextProvider.getLocalResourceRepository();
+        this.localResourceRep = (PersistentLocalResourceRepository) 
asterixAppRuntimeContextProvider
+                .getLocalResourceRepository();
+        this.replicationStrategy = replicationStrategy;
+        this.ncConfig = ((NodeControllerService) 
ncServiceContext.getControllerService()).getConfiguration();
         lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new 
LinkedBlockingQueue<>();
         pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>();
         lsmComponentId2PropertiesMap = new ConcurrentHashMap<>();
@@ -139,21 +149,34 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         lsmComponentLSNMappingService = new LSMComponentsSyncService();
         replicationNotifier = new ReplicationNotifier();
         replicationThreads = 
Executors.newCachedThreadPool(ncServiceContext.getThreadFactory());
+        Map<String, ClusterPartition[]> nodePartitions = 
asterixAppRuntimeContextProvider.getAppContext()
+                .getMetadataProperties().getNodePartitions();
+        Set<String> nodeReplicationClients = 
replicationStrategy.getRemotePrimaryReplicas(nodeId).stream()
+                .map(Replica::getId).collect(Collectors.toSet());
+        List<Integer> clientsPartitions = new ArrayList<>();
+        for (String clientId : nodeReplicationClients) {
+            for (ClusterPartition clusterPartition : 
nodePartitions.get(clientId)) {
+                clientsPartitions.add(clusterPartition.getPartitionId());
+            }
+        }
+        nodeHostedPartitions = new HashSet<>(clientsPartitions.size());
+        nodeHostedPartitions.addAll(clientsPartitions);
+        this.indexCheckpointManagerProvider =
+                ((INcApplicationContext) 
ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider();
         this.appCtx = (INcApplicationContext) 
ncServiceContext.getApplicationContext();
-        this.indexCheckpointManagerProvider = 
appCtx.getIndexCheckpointManagerProvider();
     }
 
     @Override
     public void run() {
         Thread.currentThread().setName("Replication Channel Thread");
 
-        String nodeIP = replicationProperties.getReplicaIPAddress(localNodeID);
-        int dataPort = 
replicationProperties.getDataReplicationPort(localNodeID);
+        String nodeIP = replicationProperties.getNodeIpFromId(localNodeID);
+        int dataPort = ncConfig.getReplicationPublicPort();
         try {
             serverSocketChannel = ServerSocketChannel.open();
             serverSocketChannel.configureBlocking(true);
-            InetSocketAddress replicationChannelAddress =
-                    new InetSocketAddress(InetAddress.getByName(nodeIP), 
dataPort);
+            InetSocketAddress replicationChannelAddress = new 
InetSocketAddress(InetAddress.getByName(nodeIP),
+                    dataPort);
             serverSocketChannel.socket().bind(replicationChannelAddress);
             lsmComponentLSNMappingService.start();
             replicationNotifier.start();
@@ -182,9 +205,8 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         if (remainingFile == 0) {
             if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && 
lsmCompProp.getReplicaLSN() != null
                     && 
replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
-                int remainingIndexes =
-                        
replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes
-                                .decrementAndGet();
+                int remainingIndexes = replicaUniqueLSN2RemoteMapping
+                        
.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
                 if (remainingIndexes == 0) {
                     /**
                      * Note: there is a chance that this will never be removed 
because some
@@ -228,8 +250,8 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
         public void run() {
             Thread.currentThread().setName("Replication Thread");
             try {
-                ReplicationRequestType replicationFunction =
-                        ReplicationProtocol.getRequestType(socketChannel, 
inBuffer);
+                ReplicationRequestType replicationFunction = 
ReplicationProtocol.getRequestType(socketChannel,
+                        inBuffer);
                 while (replicationFunction != ReplicationRequestType.GOODBYE) {
                     switch (replicationFunction) {
                         case REPLICATE_LOG:
@@ -275,7 +297,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 }
             } catch (Exception e) {
                 if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.log(Level.WARNING, "Unexpectedly error during 
replication.", e);
+                    LOGGER.log(Level.WARNING, "Unexpected error during 
replication.", e);
                 }
             } finally {
                 if (socketChannel.isOpen()) {
@@ -305,8 +327,8 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
             Set<Integer> datasetsToForceFlush = new HashSet<>();
             for (IndexInfo iInfo : openIndexesInfo) {
                 if 
(requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
-                    AbstractLSMIOOperationCallback ioCallback =
-                            (AbstractLSMIOOperationCallback) 
iInfo.getIndex().getIOOperationCallback();
+                    AbstractLSMIOOperationCallback ioCallback = 
(AbstractLSMIOOperationCallback) iInfo.getIndex()
+                            .getIOOperationCallback();
                     //if an index has a pending flush, then the request to 
flush it will succeed.
                     if (ioCallback.hasPendingFlush()) {
                         //remove index to indicate that it will be flushed
@@ -400,12 +422,11 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
             List<String> filesList;
             Set<Integer> partitionIds = request.getPartitionIds();
             Set<String> requesterExistingFiles = request.getExistingFiles();
-            Map<Integer, ClusterPartition> clusterPartitions =
-                    
appContextProvider.getAppContext().getMetadataProperties().getClusterPartitions();
+            Map<Integer, ClusterPartition> clusterPartitions = 
appContextProvider.getAppContext()
+                    .getMetadataProperties().getClusterPartitions();
 
-            final IReplicationStrategy repStrategy = 
replicationProperties.getReplicationStrategy();
             // Flush replicated datasets to generate the latest LSM components
-            dsLifecycleManager.flushDataset(repStrategy);
+            dsLifecycleManager.flushDataset(replicationStrategy);
             for (Integer partitionId : partitionIds) {
                 ClusterPartition partition = 
clusterPartitions.get(partitionId);
                 filesList = 
replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), 
false);
@@ -413,7 +434,7 @@ public class ReplicationChannel extends Thread implements 
IReplicationChannel {
                 for (String filePath : filesList) {
                     // Send only files of datasets that are replciated.
                     DatasetResourceReference indexFileRef = 
localResourceRep.getLocalResourceReference(filePath);
-                    if (!repStrategy.isMatch(indexFileRef.getDatasetId())) {
+                    if 
(!replicationStrategy.isMatch(indexFileRef.getDatasetId())) {
                         continue;
                     }
                     String relativeFilePath = 
StoragePathUtil.getIndexFileRelativePath(filePath);

Reply via email to