http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh deleted file mode 100755 index 4b876be..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/kill_cc_and_nc.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print $2}' | xargs -n 1 kill -9
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh deleted file mode 100755 index eb1c01e..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_delete.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix stop -n asterix; -$MANAGIX_HOME/bin/managix delete -n asterix; - http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh deleted file mode 100755 index 2fb80ce..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/load_after_recovery/stop_and_start.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix stop -n asterix; -$MANAGIX_HOME/bin/managix start -n asterix; - http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh deleted file mode 100755 index 789914b..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/create_and_start.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix create -n asterix -c $MANAGIX_HOME/clusters/local/local.xml; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh deleted file mode 100755 index 4b876be..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/kill_cc_and_nc.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print $2}' | xargs -n 1 kill -9 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh deleted file mode 100755 index eb1c01e..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_delete.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix stop -n asterix; -$MANAGIX_HOME/bin/managix delete -n asterix; - http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh deleted file mode 100755 index 2fb80ce..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/recovery_ddl/secondary_index_recovery/stop_and_start.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix stop -n asterix; -$MANAGIX_HOME/bin/managix start -n asterix; - http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh deleted file mode 100755 index e30be0c..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/configure_and_validate.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix configure; -$MANAGIX_HOME/bin/managix validate; -$MANAGIX_HOME/bin/managix validate -c $MANAGIX_HOME/clusters/local/local.xml; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh deleted file mode 100755 index 4b876be..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/kill_cc_and_nc.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -ps -ef | awk '/java.*org\.apache\.hyracks\.control\.[cn]c\.[CN]CDriver/ {print $2}' | xargs -n 1 kill -9 http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh deleted file mode 100755 index e46b4e5..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/shutdown.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix shutdown; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh b/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh deleted file mode 100755 index eb1c01e..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/scripts/setup_teardown/stop_and_delete.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -$MANAGIX_HOME/bin/managix stop -n asterix; -$MANAGIX_HOME/bin/managix delete -n asterix; - http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml b/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml deleted file mode 100644 index 38179b2..0000000 --- a/asterixdb/asterix-installer/src/test/resources/transactionts/testsuite.xml +++ /dev/null @@ -1,219 +0,0 @@ -<!-- - ! Licensed to the Apache Software Foundation (ASF) under one - ! or more contributor license agreements. See the NOTICE file - ! distributed with this work for additional information - ! regarding copyright ownership. The ASF licenses this file - ! to you under the Apache License, Version 2.0 (the - ! "License"); you may not use this file except in compliance - ! with the License. You may obtain a copy of the License at - ! - ! http://www.apache.org/licenses/LICENSE-2.0 - ! - ! Unless required by applicable law or agreed to in writing, - ! software distributed under the License is distributed on an - ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - ! KIND, either express or implied. See the License for the - ! specific language governing permissions and limitations - ! under the License. - !--> -<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql"> - <test-group name="query_after_restart"> - <test-case FilePath="query_after_restart"> - <compilation-unit name="dataset-with-meta-record"> - <output-dir compare="Text">dataset-with-meta-record</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="query_after_restart"> - <compilation-unit name="external_index"> - <output-dir compare="Text">external_index</output-dir> - </compilation-unit> - </test-case> - <test-case FilePath="query_after_restart"> - <compilation-unit name="big_object_20M"> - <output-dir compare="Text">big_object_20M</output-dir> - </compilation-unit> - </test-case> - </test-group> - <test-group name="dml_after_restart"> - <test-case FilePath="dml_after_restart"> - <compilation-unit name="multiple_secondary_indices"> - <output-dir compare="Text">multiple_secondary_indices</output-dir> - </compilation-unit> - </test-case> - </test-group> - <test-group name="recover_after_abort"> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_index_only"> - <output-dir compare="Text">primary_index_only</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_default_secondary_index"> - <output-dir compare="Text">primary_plus_default_secondary_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_default_correlated_secondary_index"> - <output-dir compare="Text">primary_plus_default_secondary_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_rtree_index"> - <output-dir compare="Text">primary_plus_rtree_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_correlated_rtree_index"> - <output-dir compare="Text">primary_plus_rtree_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_rtree_index_insert_and_delete"> - <output-dir compare="Text">primary_plus_rtree_index_insert_and_delete</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_keyword_secondary_index"> - <output-dir compare="Text">primary_plus_keyword_secondary_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_keyword_correlated_secondary_index"> - <output-dir compare="Text">primary_plus_keyword_secondary_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_ngram_index"> - <output-dir compare="Text">primary_plus_ngram_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_correlated_ngram_index"> - <output-dir compare="Text">primary_plus_ngram_index</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_multiple_secondary_indices"> - <output-dir compare="Text">primary_plus_multiple_secondary_indices</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_multiple_correlated_secondary_indices"> - <output-dir compare="Text">primary_plus_multiple_secondary_indices</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_index_only_filtered"> - <output-dir compare="Text">primary_index_only_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_default_secondary_index_filtered"> - <output-dir compare="Text">primary_plus_default_secondary_index_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_rtree_index_filtered"> - <output-dir compare="Text">primary_plus_rtree_index_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_rtree_index_insert_and_delete_filtered"> - <output-dir compare="Text">primary_plus_rtree_index_insert_and_delete_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_keyword_secondary_index_filtered"> - <output-dir compare="Text">primary_plus_keyword_secondary_index_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_ngram_index"> - <output-dir compare="Text">primary_plus_ngram_index_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_multiple_secondary_indices"> - <output-dir compare="Text">primary_plus_multiple_secondary_indices_filtered</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recover_after_abort"> - <compilation-unit name="primary_plus_multiple_secondary_indices"><!-- The only exception here is during the kill command which is in a different JVM, hence not caught --> - <output-dir compare="Text">primary_plus_multiple_secondary_indices</output-dir> - <!-- <expected-error>org.apache.hyracks.algebricks.common.exceptions.AlgebricksException</expected-error> --> - </compilation-unit> - </test-case> - </test-group> - - <test-group name="recovery_ddl"> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="dataverse_recovery"> - <output-dir compare="Text">dataverse_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="datatype_recovery"> - <output-dir compare="Text">datatype_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="dataset_recovery"> - <output-dir compare="Text">dataset_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="secondary_index_recovery"> - <output-dir compare="Text">secondary_index_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="load_after_recovery"> - <output-dir compare="Text">load_after_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="insert_after_recovery"> - <output-dir compare="Text">insert_after_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="delete_after_recovery"> - <output-dir compare="Text">delete_after_recovery</output-dir> - </compilation-unit> - </test-case> - - <test-case FilePath="recovery_ddl"> - <compilation-unit name="function_recovery"> - <output-dir compare="Text">function_recovery</output-dir> - </compilation-unit> - </test-case> - </test-group> - -</test-suite> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/pom.xml b/asterixdb/asterix-metadata/pom.xml index 9784c5e..606b474 100644 --- a/asterixdb/asterix-metadata/pom.xml +++ b/asterixdb/asterix-metadata/pom.xml @@ -56,12 +56,6 @@ <scope>compile</scope> </dependency> <dependency> - <groupId>org.apache.asterix</groupId> - <artifactId>asterix-events</artifactId> - <version>${project.version}</version> - <scope>compile</scope> - </dependency> - <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId> </dependency> @@ -118,10 +112,6 @@ <artifactId>algebricks-core</artifactId> </dependency> <dependency> - <groupId>javax.xml.bind</groupId> - <artifactId>jaxb-api</artifactId> - </dependency> - <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> @@ -190,4 +180,4 @@ <artifactId>hadoop-common</artifactId> </dependency> </dependencies> -</project> \ No newline at end of file +</project> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java deleted file mode 100644 index 8459f70..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IClusterManager.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.api; - -import java.util.Set; - -import org.apache.asterix.common.api.IClusterEventsSubscriber; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.event.schema.cluster.Node; - -public interface IClusterManager { - - /** - * @param node - * @throws Exception - */ - public void addNode(ICcApplicationContext appCtx, Node node) throws Exception; - - /** - * @param node - * @throws Exception - */ - public void removeNode(Node node) throws Exception; - - /** - * @param subscriber - */ - public void registerSubscriber(IClusterEventsSubscriber subscriber); - - /** - * @param sunscriber - * @return - */ - public boolean deregisterSubscriber(IClusterEventsSubscriber sunscriber); - - /** - * @return - */ - public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers(); - - void notifyStartupCompleted() throws Exception; -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java index 0303392..bf8079e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java @@ -65,6 +65,7 @@ import org.apache.asterix.metadata.utils.MetadataConstants; import org.apache.asterix.metadata.utils.MetadataUtil; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.formats.NonTaggedDataFormat; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory; import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory; @@ -108,14 +109,14 @@ public class MetadataBootstrap { private static String metadataNodeName; private static List<String> nodeNames; private static boolean isNewUniverse; - private static final IMetadataIndex[] PRIMARY_INDEXES = - new IMetadataIndex[] { MetadataPrimaryIndexes.DATAVERSE_DATASET, MetadataPrimaryIndexes.DATASET_DATASET, - MetadataPrimaryIndexes.DATATYPE_DATASET, MetadataPrimaryIndexes.INDEX_DATASET, - MetadataPrimaryIndexes.NODE_DATASET, MetadataPrimaryIndexes.NODEGROUP_DATASET, - MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, - MetadataPrimaryIndexes.FEED_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET, - MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, - MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET }; + private static final IMetadataIndex[] PRIMARY_INDEXES = new IMetadataIndex[] { + MetadataPrimaryIndexes.DATAVERSE_DATASET, MetadataPrimaryIndexes.DATASET_DATASET, + MetadataPrimaryIndexes.DATATYPE_DATASET, MetadataPrimaryIndexes.INDEX_DATASET, + MetadataPrimaryIndexes.NODE_DATASET, MetadataPrimaryIndexes.NODEGROUP_DATASET, + MetadataPrimaryIndexes.FUNCTION_DATASET, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, + MetadataPrimaryIndexes.FEED_DATASET, MetadataPrimaryIndexes.FEED_POLICY_DATASET, + MetadataPrimaryIndexes.LIBRARY_DATASET, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, + MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, MetadataPrimaryIndexes.FEED_CONNECTION_DATASET }; private MetadataBootstrap() { } @@ -265,9 +266,9 @@ public class MetadataBootstrap { private static void insertInitialCompactionPolicies(MetadataTransactionContext mdTxnCtx) throws AlgebricksException { - String[] builtInCompactionPolicyClassNames = - new String[] { ConstantMergePolicyFactory.class.getName(), PrefixMergePolicyFactory.class.getName(), - NoMergePolicyFactory.class.getName(), CorrelatedPrefixMergePolicyFactory.class.getName() }; + String[] builtInCompactionPolicyClassNames = new String[] { ConstantMergePolicyFactory.class.getName(), + PrefixMergePolicyFactory.class.getName(), NoMergePolicyFactory.class.getName(), + CorrelatedPrefixMergePolicyFactory.class.getName() }; for (String policyClassName : builtInCompactionPolicyClassNames) { CompactionPolicy compactionPolicy = getCompactionPolicyEntity(policyClassName); MetadataManager.INSTANCE.addCompactionPolicy(mdTxnCtx, compactionPolicy); @@ -287,8 +288,8 @@ public class MetadataBootstrap { private static CompactionPolicy getCompactionPolicyEntity(String compactionPolicyClassName) throws AlgebricksException { try { - String policyName = - ((ILSMMergePolicyFactory) (Class.forName(compactionPolicyClassName).newInstance())).getName(); + String policyName = ((ILSMMergePolicyFactory) (Class.forName(compactionPolicyClassName).newInstance())) + .getName(); return new CompactionPolicy(MetadataConstants.METADATA_DATAVERSE_NAME, policyName, compactionPolicyClassName); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { @@ -325,13 +326,13 @@ public class MetadataBootstrap { // We are unable to do this since IStorageManager needs a dataset to determine // the appropriate // objects - ILSMOperationTrackerFactory opTrackerFactory = - index.isPrimaryIndex() ? new PrimaryIndexOperationTrackerFactory(datasetId) - : new SecondaryIndexOperationTrackerFactory(datasetId); - ILSMComponentIdGeneratorFactory idGeneratorProvider = - new DatasetLSMComponentIdGeneratorFactory(index.getDatasetId().getId()); - ILSMIOOperationCallbackFactory ioOpCallbackFactory = - new LSMBTreeIOOperationCallbackFactory(idGeneratorProvider); + ILSMOperationTrackerFactory opTrackerFactory = index.isPrimaryIndex() + ? new PrimaryIndexOperationTrackerFactory(datasetId) + : new SecondaryIndexOperationTrackerFactory(datasetId); + ILSMComponentIdGeneratorFactory idGeneratorProvider = new DatasetLSMComponentIdGeneratorFactory( + index.getDatasetId().getId()); + ILSMIOOperationCallbackFactory ioOpCallbackFactory = new LSMBTreeIOOperationCallbackFactory( + idGeneratorProvider); IStorageComponentProvider storageComponentProvider = appContext.getStorageComponentProvider(); if (isNewUniverse()) { LSMBTreeLocalResourceFactory lsmBtreeFactory = new LSMBTreeLocalResourceFactory( @@ -341,8 +342,8 @@ public class MetadataBootstrap { storageComponentProvider.getIoOperationSchedulerProvider(), appContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, true, bloomFilterKeyFields, appContext.getBloomFilterFalsePositiveRate(), true, null); - DatasetLocalResourceFactory dsLocalResourceFactory = - new DatasetLocalResourceFactory(datasetId, lsmBtreeFactory); + DatasetLocalResourceFactory dsLocalResourceFactory = new DatasetLocalResourceFactory(datasetId, + lsmBtreeFactory); // TODO(amoudi) Creating the index should be done through the same code path as // other indexes // This is to be done by having a metadata dataset associated with each index @@ -362,8 +363,8 @@ public class MetadataBootstrap { if (index.getResourceId() != resource.getId()) { throw new HyracksDataException("Resource Id doesn't match expected metadata index resource id"); } - IndexDataflowHelper indexHelper = - new IndexDataflowHelper(ncServiceCtx, storageComponentProvider.getStorageManager(), file); + IndexDataflowHelper indexHelper = new IndexDataflowHelper(ncServiceCtx, + storageComponentProvider.getStorageManager(), file); indexHelper.open(); // Opening the index through the helper will ensure it gets instantiated indexHelper.close(); } @@ -418,8 +419,8 @@ public class MetadataBootstrap { LOGGER.info("Dropped a pending dataverse: " + dataverse.getDataverseName()); } } else { - List<Dataset> datasets = - MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, dataverse.getDataverseName()); + List<Dataset> datasets = MetadataManager.INSTANCE.getDataverseDatasets(mdTxnCtx, + dataverse.getDataverseName()); for (Dataset dataset : datasets) { recoverDataset(mdTxnCtx, dataset); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java deleted file mode 100644 index c1c19a4..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.cluster; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.Unmarshaller; - -import org.apache.asterix.common.api.IClusterEventsSubscriber; -import org.apache.asterix.common.api.IClusterManagementWork; -import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.config.ExternalProperties; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.event.management.AsterixEventServiceClient; -import org.apache.asterix.event.model.AsterixInstance; -import org.apache.asterix.event.schema.cluster.Cluster; -import org.apache.asterix.event.schema.cluster.Node; -import org.apache.asterix.event.schema.pattern.Pattern; -import org.apache.asterix.event.schema.pattern.Patterns; -import org.apache.asterix.event.service.AsterixEventService; -import org.apache.asterix.event.service.AsterixEventServiceUtil; -import org.apache.asterix.event.service.ILookupService; -import org.apache.asterix.event.service.ServiceProvider; -import org.apache.asterix.event.util.PatternCreator; -import org.apache.asterix.installer.schema.conf.Configuration; -import org.apache.asterix.metadata.api.IClusterManager; - -public class ClusterManager implements IClusterManager { - - private static final Logger LOGGER = Logger.getLogger(ClusterManager.class.getName()); - - public static final IClusterManager INSTANCE = ClusterManagerProvider.getClusterManager(); - - private final AsterixEventServiceClient client; - - private final ILookupService lookupService; - - private final Set<IClusterEventsSubscriber> eventSubscribers = new HashSet<>(); - - ClusterManager(String eventHome) { - String asterixDir = System.getProperty("user.dir") + File.separator + "asterix"; - File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml"); - Configuration configuration = null; - - try { - JAXBContext configCtx = JAXBContext.newInstance(Configuration.class); - Unmarshaller unmarshaller = configCtx.createUnmarshaller(); - configuration = (Configuration) unmarshaller.unmarshal(configFile); - AsterixEventService.initialize(configuration, asterixDir, eventHome); - client = AsterixEventService.getAsterixEventServiceClient(ClusterProperties.INSTANCE.getCluster()); - - lookupService = ServiceProvider.INSTANCE.getLookupService(); - if (!lookupService.isRunning(configuration)) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Lookup service not running. Starting lookup service ..."); - } - lookupService.startService(configuration); - } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Lookup service running"); - } - } - - } catch (Exception e) { - throw new IllegalStateException("Unable to initialize cluster manager" + e); - } - } - - @Override - public void addNode(ICcApplicationContext appCtx, Node node) throws Exception { - Cluster cluster = ClusterProperties.INSTANCE.getCluster(); - List<Pattern> pattern = new ArrayList<>(); - String asterixInstanceName = appCtx.getMetadataProperties().getInstanceName(); - Patterns prepareNode = PatternCreator.INSTANCE.createPrepareNodePattern(asterixInstanceName, - ClusterProperties.INSTANCE.getCluster(), node); - cluster.getNode().add(node); - client.submit(prepareNode); - - ExternalProperties externalProps = appCtx.getExternalProperties(); - AsterixEventServiceUtil.poulateClusterEnvironmentProperties(cluster, externalProps.getCCJavaParams(), - externalProps.getNCJavaParams()); - - pattern.clear(); - String ccHost = cluster.getMasterNode().getClusterIp(); - String hostId = node.getId(); - String nodeControllerId = asterixInstanceName + "_" + node.getId(); - String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices(); - Pattern startNC = - PatternCreator.INSTANCE.createNCStartPattern(ccHost, hostId, nodeControllerId, iodevices, false); - pattern.add(startNC); - Patterns startNCPattern = new Patterns(pattern); - client.submit(startNCPattern); - - removeNode(cluster.getSubstituteNodes().getNode(), node); - - AsterixInstance instance = lookupService.getAsterixInstance(cluster.getInstanceName()); - instance.getCluster().getNode().add(node); - removeNode(instance.getCluster().getSubstituteNodes().getNode(), node); - lookupService.updateAsterixInstance(instance); - } - - private void removeNode(List<Node> list, Node node) { - Node nodeToRemove = null; - for (Node n : list) { - if (n.getId().equals(node.getId())) { - nodeToRemove = n; - break; - } - } - if (nodeToRemove != null) { - boolean removed = list.remove(nodeToRemove); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("attempt to remove node :" + nodeToRemove + " successful " + removed); - } - } - } - - @Override - public void removeNode(Node node) { - // to be implemented later. - } - - @Override - public void registerSubscriber(IClusterEventsSubscriber subscriber) { - eventSubscribers.add(subscriber); - } - - @Override - public boolean deregisterSubscriber(IClusterEventsSubscriber subscriber) { - return eventSubscribers.remove(subscriber); - } - - @Override - public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers() { - return eventSubscribers; - } - - @Override - public void notifyStartupCompleted() throws Exception { - // Notify Zookeeper that the startup is complete - lookupService.reportClusterState(ClusterProperties.INSTANCE.getCluster().getInstanceName(), - IClusterManagementWork.ClusterState.ACTIVE); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java deleted file mode 100644 index 2b7c9df..0000000 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManagerProvider.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.metadata.cluster; - -import java.util.Collections; -import java.util.Set; - -import org.apache.asterix.common.api.IClusterEventsSubscriber; -import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.event.schema.cluster.Cluster; -import org.apache.asterix.event.schema.cluster.Node; -import org.apache.asterix.metadata.api.IClusterManager; - -public class ClusterManagerProvider { - - private ClusterManagerProvider() { - } - - public static IClusterManager getClusterManager() { - return Holder.INSTANCE; - } - - private static final class Holder { - static final IClusterManager INSTANCE; - - static { - Cluster asterixCluster = ClusterProperties.INSTANCE.getCluster(); - String eventHome = asterixCluster == null ? null - : asterixCluster.getWorkingDir() == null ? null : asterixCluster.getWorkingDir().getDir(); - - if (eventHome != null) { - INSTANCE = new ClusterManager(eventHome); - } else { - INSTANCE = new NoopClusterManager(); - } - } - - private Holder() { - } - } - private static class NoopClusterManager implements IClusterManager { - @Override - public void addNode(ICcApplicationContext appCtx, Node node) { - // no-op - } - - @Override - public void removeNode(Node node) { - // no-op - } - - @Override - public void registerSubscriber(IClusterEventsSubscriber subscriber) { - // no-op - } - - @Override - public boolean deregisterSubscriber(IClusterEventsSubscriber sunscriber) { - return true; - } - - @Override - public Set<IClusterEventsSubscriber> getRegisteredClusterEventSubscribers() { - return Collections.emptySet(); - } - - @Override - public void notifyStartupCompleted() { - // no-op - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 1e0d597..0398f1a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -284,8 +284,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> } public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException { - String dv = - dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) : dataverse; + String dv = dataverse == null ? (defaultDataverse == null ? null : defaultDataverse.getDataverseName()) + : dataverse; if (dv == null) { return null; } @@ -409,8 +409,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> policyAccessor, factoryOutput.second); break; case EXTERNAL: - String libraryName = - feed.getAdapterName().trim().split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; + String libraryName = feed.getAdapterName().trim() + .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, libraryName, adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second); break; @@ -433,19 +433,21 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) { isSecondary = !indexName.equals(primaryIndex.getIndexName()); } - Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), - dataset.getDatasetName(), indexName) : primaryIndex; + Index theIndex = isSecondary + ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), + indexName) + : primaryIndex; int numPrimaryKeys = dataset.getPrimaryKeys().size(); RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - getSplitProviderAndConstraints(dataset, theIndex.getIndexName()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(dataset, + theIndex.getIndexName()); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; } - ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields); + ISearchOperationCallbackFactory searchCallbackFactory = dataset + .getSearchCallbackFactory(storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields); IStorageManager storageManager = getStorageComponentProvider().getStorageManager(); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first); BTreeSearchOperatorDescriptor btreeSearchOp; @@ -476,8 +478,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName()); } RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = getSplitProviderAndConstraints(dataset, + secondaryIndex.getIndexName()); int[] primaryKeyFields = new int[numPrimaryKeys]; for (int i = 0; i < numPrimaryKeys; i++) { primaryKeyFields[i] = i; @@ -486,8 +488,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, primaryKeyFields); RTreeSearchOperatorDescriptor rtreeSearchOp; - IIndexDataflowHelperFactory indexDataflowHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); + IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), spPc.first); if (dataset.getDatasetType() == DatasetType.INTERNAL) { rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, indexDataflowHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), @@ -512,8 +514,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> File outFile = new File(fs.getPath()); String nodeId = fs.getNodeName(); - SinkWriterRuntimeFactory runtime = - new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, getWriterFactory(), inputDesc); + SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, + getWriterFactory(), inputDesc); AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId }); return new Pair<>(runtime, apc); } @@ -562,16 +564,16 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> fieldPermutation[numKeys + 1] = idx; } - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitProviderAndConstraints( + dataset); long numElementsHint = getCardinalityPerPartitionHint(dataset); // TODO // figure out the right behavior of the bulkload and then give the // right callback // (ex. what's the expected behavior when there is an error during // bulkload?) - IIndexDataflowHelperFactory indexHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); + IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), splitsAndConstraint.first); LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null, fieldPermutation, GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId()); @@ -688,8 +690,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> numElementsHint = Long.parseLong(numElementsHintString); } int numPartitions = 0; - List<String> nodeGroup = - MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); + List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()) + .getNodeNames(); IClusterStateManager csm = appCtx.getClusterStateManager(); for (String nd : nodeGroup) { numPartitions += csm.getNodePartitionsCount(nd); @@ -705,9 +707,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> getApplicationContext().getServiceContext(), adapterName, configuration, itemType, metaType); // check to see if dataset is indexed - Index filesIndex = - MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), - dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); + Index filesIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), + dataset.getDatasetName(), + dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); if (filesIndex != null && filesIndex.getPendingOp() == 0) { // get files @@ -743,7 +745,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) throws AlgebricksException { - return SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset, indexName, mdTxnCtx); + return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx, appCtx.getClusterStateManager()); } public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName) @@ -769,21 +771,22 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> MetadataProvider metadataProvider, boolean retainMissing) throws AlgebricksException { try { // Get data type - ARecordType itemType = - (ARecordType) MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), - dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype(); + ARecordType itemType = (ARecordType) MetadataManager.INSTANCE + .getDatatype(metadataProvider.getMetadataTxnContext(), dataset.getDataverseName(), + dataset.getItemTypeName()) + .getDatatype(); ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory( getApplicationContext().getServiceContext(), datasetDetails.getProperties(), itemType, ridIndexes, retainInput, retainMissing, context.getMissingWriterFactory()); String fileIndexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = - metadataProvider.getSplitProviderAndConstraints(dataset, fileIndexName); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadataProvider + .getSplitProviderAndConstraints(dataset, fileIndexName); Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), fileIndexName); // Create the file index data flow helper - IIndexDataflowHelperFactory indexDataflowHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); + IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), spPc.first); // Create the out record descriptor, appContext and fileSplitProvider for the // files index RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); @@ -849,12 +852,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> throw new AlgebricksException("Can only scan datasets of records."); } - ISerializerDeserializer<?> payloadSerde = - getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType); + ISerializerDeserializer<?> payloadSerde = getDataFormat().getSerdeProvider() + .getSerializerDeserializer(itemType); RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde }); - ExternalScanOperatorDescriptor dataScanner = - new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory); + ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, + adapterFactory); AlgebricksPartitionConstraint constraint; try { @@ -880,9 +883,9 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> int i = 0; for (; i < sidxKeyFieldCount; ++i) { - Pair<IAType, Boolean> keyPairType = - Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i), - (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType); + Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), + sidxKeyFieldNames.get(i), + (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType); IAType keyType = keyPairType.first; comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); @@ -920,8 +923,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException { String datasetName = dataSource.getId().getDatasourceName(); - Dataset dataset = - MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), datasetName); + Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), + datasetName); int numKeys = keys.size(); int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; // Move key fields to front. @@ -949,18 +952,18 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName()); - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitProviderAndConstraints( + dataset); // prepare callback int[] primaryKeyFields = new int[numKeys]; for (i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; } - IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, primaryIndex, indexOp, primaryKeyFields); - IIndexDataflowHelperFactory idfh = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); + IModificationOperationCallbackFactory modificationCallbackFactory = dataset + .getModificationCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields); + IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), + splitsAndConstraint.first); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); @@ -1119,8 +1122,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> dataset.getDatasetName(), indexName); List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames(); List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); - Pair<IAType, Boolean> keyPairType = - Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType); + Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), + secondaryKeyExprs.get(0), recType); IAType spatialType = keyPairType.first; int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); int numSecondaryKeys = dimension * 2; @@ -1173,14 +1176,14 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> prevFieldPermutation[numKeys] = idx; } } - Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = - getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); + Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitProviderAndConstraints( + dataset, secondaryIndex.getIndexName()); // prepare callback IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); - IIndexDataflowHelperFactory indexDataflowHelperFactory = - new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); + IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( + storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; if (bulkload) { long numElementsHint = getCardinalityPerPartitionHint(dataset); @@ -1501,8 +1504,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return null; } IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); - IScalarEvaluatorFactory filterEvalFactory = - expressionRuntimeProvider.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas, context); + IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr, + typeEnv, inputSchemas, context); return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java index 7ac6183..2a6d0e8 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.exceptions.MetadataException; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.MetadataTransactionContext; @@ -52,14 +53,18 @@ public class SplitsAndConstraintsUtil { return splits.toArray(new FileSplit[] {}); } - public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset, - String indexName, MetadataTransactionContext mdTxnCtx) throws AlgebricksException { - NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()); - if (nodeGroup == null) { - throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName()); + public static FileSplit[] getIndexSplits(Dataset dataset, String indexName, MetadataTransactionContext mdTxnCtx, + IClusterStateManager csm) throws AlgebricksException { + try { + NodeGroup nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()); + if (nodeGroup == null) { + throw new AlgebricksException("Couldn't find node group " + dataset.getNodeGroupName()); + } + List<String> nodeList = nodeGroup.getNodeNames(); + return getIndexSplits(csm, dataset, indexName, nodeList); + } catch (MetadataException me) { + throw new AlgebricksException(me); } - List<String> nodeList = nodeGroup.getNodeNames(); - return getIndexSplits(clusterStateManager, dataset, indexName, nodeList); } public static FileSplit[] getIndexSplits(IClusterStateManager clusterStateManager, Dataset dataset, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-replication/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/pom.xml b/asterixdb/asterix-replication/pom.xml index 2b5fe0c..41afc1d 100644 --- a/asterixdb/asterix-replication/pom.xml +++ b/asterixdb/asterix-replication/pom.xml @@ -72,6 +72,14 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-control-nc</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-control-common</artifactId> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java index be7cdc5..e260de5 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java @@ -25,7 +25,6 @@ import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.util.concurrent.Callable; -import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.replication.Replica.ReplicaState; import org.apache.asterix.replication.functions.ReplicationProtocol; @@ -36,15 +35,13 @@ public class ReplicaStateChecker implements Callable<Void> { private final Replica replica; private final int replicationTimeOut; private final ReplicationManager replicationManager; - private final ReplicationProperties asterixReplicationProperties; private final boolean suspendReplication; public ReplicaStateChecker(Replica replica, int replicationTimeOut, ReplicationManager replicationManager, - ReplicationProperties asterixReplicationProperties, boolean suspendReplication) { + boolean suspendReplication) { this.replica = replica; this.replicationTimeOut = replicationTimeOut; this.replicationManager = replicationManager; - this.asterixReplicationProperties = asterixReplicationProperties; this.suspendReplication = suspendReplication; } @@ -53,7 +50,8 @@ public class ReplicaStateChecker implements Callable<Void> { Thread.currentThread().setName("ReplicaConnector Thread"); long startTime = System.currentTimeMillis(); - InetSocketAddress replicaAddress = replica.getAddress(asterixReplicationProperties); + InetSocketAddress replicaAddress = InetSocketAddress.createUnresolved(replica.getClusterIp(), + replica.getPort()); while (true) { try (SocketChannel connection = SocketChannel.open()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java index ba24e07..32e9498 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java @@ -28,6 +28,7 @@ import java.nio.channels.AsynchronousCloseException; import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -54,6 +55,7 @@ import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.IReplicationThread; +import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.replication.ReplicaEvent; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; @@ -82,6 +84,8 @@ import org.apache.asterix.replication.storage.ReplicaResourcesManager; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogBuffer; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; @@ -116,12 +120,16 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { private final Object flushLogslock = new Object(); private final IDatasetLifecycleManager dsLifecycleManager; private final PersistentLocalResourceRepository localResourceRep; + private final IReplicationStrategy replicationStrategy; + private final NCConfig ncConfig; + private Set nodeHostedPartitions; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private final INcApplicationContext appCtx; public ReplicationChannel(String nodeId, ReplicationProperties replicationProperties, ILogManager logManager, IReplicaResourcesManager replicaResoucesManager, IReplicationManager replicationManager, - INCServiceContext ncServiceContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider) { + INCServiceContext ncServiceContext, IAppRuntimeContextProvider asterixAppRuntimeContextProvider, + IReplicationStrategy replicationStrategy) { this.logManager = logManager; this.localNodeID = nodeId; this.replicaResourcesManager = (ReplicaResourcesManager) replicaResoucesManager; @@ -129,8 +137,10 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { this.replicationProperties = replicationProperties; this.appContextProvider = asterixAppRuntimeContextProvider; this.dsLifecycleManager = asterixAppRuntimeContextProvider.getDatasetLifecycleManager(); - this.localResourceRep = - (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider.getLocalResourceRepository(); + this.localResourceRep = (PersistentLocalResourceRepository) asterixAppRuntimeContextProvider + .getLocalResourceRepository(); + this.replicationStrategy = replicationStrategy; + this.ncConfig = ((NodeControllerService) ncServiceContext.getControllerService()).getConfiguration(); lsmComponentRemoteLSN2LocalLSNMappingTaskQ = new LinkedBlockingQueue<>(); pendingNotificationRemoteLogsQ = new LinkedBlockingQueue<>(); lsmComponentId2PropertiesMap = new ConcurrentHashMap<>(); @@ -139,21 +149,34 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { lsmComponentLSNMappingService = new LSMComponentsSyncService(); replicationNotifier = new ReplicationNotifier(); replicationThreads = Executors.newCachedThreadPool(ncServiceContext.getThreadFactory()); + Map<String, ClusterPartition[]> nodePartitions = asterixAppRuntimeContextProvider.getAppContext() + .getMetadataProperties().getNodePartitions(); + Set<String> nodeReplicationClients = replicationStrategy.getRemotePrimaryReplicas(nodeId).stream() + .map(Replica::getId).collect(Collectors.toSet()); + List<Integer> clientsPartitions = new ArrayList<>(); + for (String clientId : nodeReplicationClients) { + for (ClusterPartition clusterPartition : nodePartitions.get(clientId)) { + clientsPartitions.add(clusterPartition.getPartitionId()); + } + } + nodeHostedPartitions = new HashSet<>(clientsPartitions.size()); + nodeHostedPartitions.addAll(clientsPartitions); + this.indexCheckpointManagerProvider = + ((INcApplicationContext) ncServiceContext.getApplicationContext()).getIndexCheckpointManagerProvider(); this.appCtx = (INcApplicationContext) ncServiceContext.getApplicationContext(); - this.indexCheckpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); } @Override public void run() { Thread.currentThread().setName("Replication Channel Thread"); - String nodeIP = replicationProperties.getReplicaIPAddress(localNodeID); - int dataPort = replicationProperties.getDataReplicationPort(localNodeID); + String nodeIP = replicationProperties.getNodeIpFromId(localNodeID); + int dataPort = ncConfig.getReplicationPublicPort(); try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(true); - InetSocketAddress replicationChannelAddress = - new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort); + InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP), + dataPort); serverSocketChannel.socket().bind(replicationChannelAddress); lsmComponentLSNMappingService.start(); replicationNotifier.start(); @@ -182,9 +205,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { if (remainingFile == 0) { if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null && replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) { - int remainingIndexes = - replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes - .decrementAndGet(); + int remainingIndexes = replicaUniqueLSN2RemoteMapping + .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet(); if (remainingIndexes == 0) { /** * Note: there is a chance that this will never be removed because some @@ -228,8 +250,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { public void run() { Thread.currentThread().setName("Replication Thread"); try { - ReplicationRequestType replicationFunction = - ReplicationProtocol.getRequestType(socketChannel, inBuffer); + ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel, + inBuffer); while (replicationFunction != ReplicationRequestType.GOODBYE) { switch (replicationFunction) { case REPLICATE_LOG: @@ -275,7 +297,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { } } catch (Exception e) { if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.log(Level.WARNING, "Unexpectedly error during replication.", e); + LOGGER.log(Level.WARNING, "Unexpected error during replication.", e); } } finally { if (socketChannel.isOpen()) { @@ -305,8 +327,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { Set<Integer> datasetsToForceFlush = new HashSet<>(); for (IndexInfo iInfo : openIndexesInfo) { if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) { - AbstractLSMIOOperationCallback ioCallback = - (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback(); + AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex() + .getIOOperationCallback(); //if an index has a pending flush, then the request to flush it will succeed. if (ioCallback.hasPendingFlush()) { //remove index to indicate that it will be flushed @@ -400,12 +422,11 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { List<String> filesList; Set<Integer> partitionIds = request.getPartitionIds(); Set<String> requesterExistingFiles = request.getExistingFiles(); - Map<Integer, ClusterPartition> clusterPartitions = - appContextProvider.getAppContext().getMetadataProperties().getClusterPartitions(); + Map<Integer, ClusterPartition> clusterPartitions = appContextProvider.getAppContext() + .getMetadataProperties().getClusterPartitions(); - final IReplicationStrategy repStrategy = replicationProperties.getReplicationStrategy(); // Flush replicated datasets to generate the latest LSM components - dsLifecycleManager.flushDataset(repStrategy); + dsLifecycleManager.flushDataset(replicationStrategy); for (Integer partitionId : partitionIds) { ClusterPartition partition = clusterPartitions.get(partitionId); filesList = replicaResourcesManager.getPartitionIndexesFiles(partition.getPartitionId(), false); @@ -413,7 +434,7 @@ public class ReplicationChannel extends Thread implements IReplicationChannel { for (String filePath : filesList) { // Send only files of datasets that are replciated. DatasetResourceReference indexFileRef = localResourceRep.getLocalResourceReference(filePath); - if (!repStrategy.isMatch(indexFileRef.getDatasetId())) { + if (!replicationStrategy.isMatch(indexFileRef.getDatasetId())) { continue; } String relativeFilePath = StoragePathUtil.getIndexFileRelativePath(filePath);
