This is an automated email from the ASF dual-hosted git repository.
ming pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
The following commit(s) were added to refs/heads/master by this push:
new 89a8f9b1 refact(core): adaptor for common 1.2 & fix a string of
possible CI problem (#286)
89a8f9b1 is described below
commit 89a8f9b1f78ea78b8b9f29ad64ba5aeafff3bfac
Author: imbajin <[email protected]>
AuthorDate: Mon Dec 11 15:07:38 2023 +0800
refact(core): adaptor for common 1.2 & fix a string of possible CI problem
(#286)
* chore(ci): update common 1.2 & upgrade action version
* replace HTTP_CODE
* Update codeql-analysis.yml
* Update pom.xml
* use docker image for server
* update webmockserver version to adapt okhttp4.x
* remove useless client
* Revert "remove useless client"
This reverts commit 3e894d34e31ce2a9e0366c0c8dd7cb66af6cc7cb.
* chore: replace loading data with docker & fix AssertThrows
* update loader image name
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* change path
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* fix name
* Update load-data-into-hugegraph.sh
* update ci logic
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update load-data-into-hugegraph.sh
* Update WorkerServiceTest.java
* update data path
* refactor WorkerServer
* update the WokerService init & close logic
* Update MasterService.java
* CI passed & revert the ignore
* Update MessageRecvManagerTest.java
* enable 3rd party check
* Update license-checker.yml
* Update
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
Co-authored-by: Cong Zhao <[email protected]>
* Update MasterService.java
* tiny fix
---------
Co-authored-by: Cong Zhao <[email protected]>
---
.github/configs/settings.xml | 60 ++++++
.github/workflows/ci.yml | 30 +--
.github/workflows/codeql-analysis.yml | 70 ++++---
.github/workflows/license-checker.yml | 50 ++---
.github/workflows/stale.yml | 2 +-
checkstyle.xml | 6 +-
.../hugegraph/computer/core/common/Constants.java | 2 +-
.../hugegraph/computer/core/bsp/EtcdClient.java | 93 ++++-----
.../computer/core/master/MasterService.java | 42 ++--
.../core/network/session/ClientSession.java | 10 +-
.../computer/core/worker/WorkerService.java | 187 +++++++++---------
computer-dist/src/assembly/dataset/struct.json | 6 +-
.../travis/install-hugegraph-from-source.sh | 3 +
computer-dist/src/assembly/travis/install-k8s.sh | 1 +
.../assembly/travis/load-data-into-hugegraph.sh | 44 +++--
computer-dist/src/assembly/travis/start-etcd.sh | 3 +-
.../computer/k8s/operator/OperatorEntrypoint.java | 50 +++--
computer-test/pom.xml | 32 +++
.../network/netty/NettyTransportClientTest.java | 110 ++++-------
.../core/receiver/MessageRecvManagerTest.java | 46 ++---
.../computer/core/worker/WorkerServiceTest.java | 214 +++++++++------------
.../computer/k8s/KubernetesDriverTest.java | 104 ++++------
pom.xml | 26 ++-
23 files changed, 615 insertions(+), 576 deletions(-)
diff --git a/.github/configs/settings.xml b/.github/configs/settings.xml
new file mode 100644
index 00000000..294ded1c
--- /dev/null
+++ b/.github/configs/settings.xml
@@ -0,0 +1,60 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
https://maven.apache.org/xsd/settings-1.0.0.xsd">
+ <servers>
+ <server>
+ <id>github</id>
+ <username>${env.GITHUB_ACTOR}</username>
+ <password>${env.GITHUB_TOKEN}</password>
+ </server>
+ </servers>
+
+ <profiles>
+ <profile>
+ <id>local-repo</id>
+ <repositories>
+ <repository>
+ <id>central</id>
+ <url>https://repo.maven.apache.org/maven2</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <id>staged-releases</id>
+
<url>https://repository.apache.org/content/groups/staging/</url>
+ </repository>
+ </repositories>
+ <pluginRepositories>
+ <pluginRepository>
+ <id>staged-releases</id>
+
<url>https://repository.apache.org/content/groups/staging/</url>
+ </pluginRepository>
+ </pluginRepositories>
+ </profile>
+ </profiles>
+
+ <activeProfiles>
+ <activeProfile>local-repo</activeProfile>
+ </activeProfiles>
+</settings>
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index e64ec7d7..5cfd38a1 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -11,14 +11,16 @@ jobs:
computer-ci:
runs-on: ubuntu-latest
env:
+ USE_STAGE: 'true' # Whether to include the stage repository.
TRAVIS_DIR: computer-dist/src/assembly/travis
KUBERNETES_VERSION: 1.20.1
- HUGEGRAPH_SERVER_COMMIT_ID: d01c8737d7d5909119671953521f1401dcd1a188
BSP_ETCD_URL: http://localhost:2579
+ # TODO: delete this env in the future (replaced by docker way now)
+ HUGEGRAPH_SERVER_COMMIT_ID: d01c8737d7d5909119671953521f1401dcd1a188
steps:
- name: Checkout
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
with:
fetch-depth: 2
@@ -57,12 +59,6 @@ jobs:
- name: Setup Minikube-Kubernetes
run: $TRAVIS_DIR/install-k8s.sh
- - name: Check Component
- run: |
- sleep 5
- curl localhost:9000
- kubectl get nodes
-
- name: Cache Maven packages
uses: actions/cache@v3
with:
@@ -70,11 +66,16 @@ jobs:
key: ${{ runner.os }}-m2-${{ hashFiles('**/pom.xml') }}
restore-keys: ${{ runner.os }}-m2
+ - name: Check Component
+ run: |
+ curl localhost:9000
+ kubectl get nodes
+
- name: Prepare env and service
run: |
$TRAVIS_DIR/install-env.sh
- $TRAVIS_DIR/install-hugegraph-from-source.sh
$HUGEGRAPH_SERVER_COMMIT_ID
$TRAVIS_DIR/load-data-into-hugegraph.sh
+ #$TRAVIS_DIR/install-hugegraph-from-source.sh
$HUGEGRAPH_SERVER_COMMIT_ID
- name: Install JDK 11
uses: actions/setup-java@v3
@@ -82,8 +83,14 @@ jobs:
java-version: '11'
distribution: 'zulu'
+ - name: Use staged maven repo
+ if: ${{ env.USE_STAGE == 'true' }}
+ run: |
+ cp $HOME/.m2/settings.xml /tmp/settings.xml
+ mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml
+
- name: Compile
- run: mvn clean compile -Dmaven.javadoc.skip=true -ntp
+ run: mvn clean compile -e -Dmaven.javadoc.skip=true -ntp
- name: Integrate test
run: mvn test -P integrate-test -ntp
@@ -92,6 +99,7 @@ jobs:
run: mvn test -P unit-test -ntp
- name: Upload coverage to Codecov
- uses: codecov/[email protected]
+ uses: codecov/codecov-action@v3
with:
+ token: ${{ secrets.CODECOV_TOKEN }}
file: target/site/jacoco/jacoco.xml
diff --git a/.github/workflows/codeql-analysis.yml
b/.github/workflows/codeql-analysis.yml
index 7bc30f62..32bebd0a 100644
--- a/.github/workflows/codeql-analysis.yml
+++ b/.github/workflows/codeql-analysis.yml
@@ -4,13 +4,15 @@ name: "CodeQL"
on:
pull_request:
- # The branches below must be a subset of the branches above, now enable it
in all PR
- # branches: [ master ]
+ # The branches below must be a subset of the branches above, now enable it
in all PR
+ # branches: [ master ]
schedule:
- cron: '45 7 * * 1'
jobs:
analyze:
+ env:
+ USE_STAGE: 'true' # Whether to include the stage repository.
name: Analyze
runs-on: ubuntu-latest
permissions:
@@ -24,43 +26,55 @@ jobs:
language: [ 'go', 'java' ]
steps:
- - name: Checkout repository
- uses: actions/checkout@v3
+ - name: Checkout repository
+ uses: actions/checkout@v4
- # Initializes the CodeQL tools for scanning.
- - name: Initialize CodeQL
- uses: github/codeql-action/init@v2
- with:
- languages: ${{ matrix.language }}
- # If you wish to specify custom queries, you can do so here or in a
config file.
- # By default, queries listed here will override any specified in a
config file.
- # Prefix the list here with "+" to use these queries and those in the
config file.
- # queries: ./path/to/local/query, your-org/your-repo/queries@main
+ - name: Setup Java JDK
+ uses: actions/setup-java@v3
+ with:
+ distribution: 'zulu'
+ java-version: '11'
- # Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
- # If this step fails, then you should remove it and run the build manually
(see below)
- - name: Autobuild
- uses: github/codeql-action/autobuild@v2
+ - name: use staged maven repo settings
+ if: ${{ env.USE_STAGE == 'true' }}
+ run: |
+ cp $HOME/.m2/settings.xml /tmp/settings.xml
+ mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml
- # ℹ️ Command-line programs to run using the OS shell.
- # 📚 https://git.io/JvXDl
+ # Initializes the CodeQL tools for scanning.
+ - name: Initialize CodeQL
+ uses: github/codeql-action/init@v2
+ with:
+ languages: ${{ matrix.language }}
+ # If you wish to specify custom queries, you can do so here or in a
config file.
+ # By default, queries listed here will override any specified in a
config file.
+ # Prefix the list here with "+" to use these queries and those in
the config file.
+ # queries: ./path/to/local/query, your-org/your-repo/queries@main
- # ✏️ If the Autobuild fails above, remove it and uncomment the following
three lines
- # and modify them (or add more) to build your code if your project
- # uses a compiled language
+ # Autobuild attempts to build any compiled languages (C/C++, C#, or
Java).
+ # If this step fails, then you should remove it and run the build
manually (see below)
+ - name: Autobuild
+ uses: github/codeql-action/autobuild@v2
- #- run: |
- # make bootstrap
- # make release
+ # ℹ️ Command-line programs to run using the OS shell.
+ # 📚 https://git.io/JvXDl
- - name: Perform CodeQL Analysis
- uses: github/codeql-action/analyze@v2
+ # ✏️ If the Autobuild fails above, remove it and uncomment the following
three lines
+ # and modify them (or add more) to build your code if your project
+ # uses a compiled language
+
+ #- run: |
+ # make bootstrap
+ # make release
+
+ - name: Perform CodeQL Analysis
+ uses: github/codeql-action/analyze@v2
dependency-review:
runs-on: ubuntu-latest
steps:
- name: 'Checkout Repository'
- uses: actions/checkout@v3
+ uses: actions/checkout@v4
- name: 'Dependency Review'
uses: actions/dependency-review-action@v3
diff --git a/.github/workflows/license-checker.yml
b/.github/workflows/license-checker.yml
index ea2c3687..caa3f85f 100644
--- a/.github/workflows/license-checker.yml
+++ b/.github/workflows/license-checker.yml
@@ -15,7 +15,7 @@ jobs:
check-license-header:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v3
+ - uses: actions/checkout@v4
# More info could refer to: https://github.com/apache/skywalking-eyes
- name: Check License Header
uses: apache/skywalking-eyes@main
@@ -36,25 +36,29 @@ jobs:
find ./ -name rat.txt -print0 | xargs -0 -I file cat file >
merged-rat.txt
grep "Binaries" merged-rat.txt -C 3 && cat merged-rat.txt
-# TODO: enable it later
-# check-dependency-license:
-# runs-on: ubuntu-latest
-# env:
-# SCRIPT_DEPENDENCY: computer-dist/scripts/dependency
-# steps:
-# - name: Checkout source
-# uses: actions/checkout@v3
-# - name: Set up JDK 11
-# uses: actions/setup-java@v3
-# with:
-# java-version: '11'
-# distribution: 'adopt'
-# - name: mvn install
-# run: |
-# mvn install -DskipTests=true -ntp
-# - name: generate current dependencies
-# run: |
-# bash $SCRIPT_DEPENDENCY/regenerate_known_dependencies.sh
current-dependencies.txt
-# - name: check third dependencies
-# run: |
-# bash $SCRIPT_DEPENDENCY/check_dependencies.sh
+ check-dependency-license:
+ runs-on: ubuntu-latest
+ env:
+ SCRIPT_DEPENDENCY: computer-dist/scripts/dependency
+ USE_STAGE: 'true' # Whether to include the stage repository.
+ steps:
+ - name: Checkout source
+ uses: actions/checkout@v4
+ - name: Set up JDK 11
+ uses: actions/setup-java@v3
+ with:
+ java-version: '11'
+ distribution: 'adopt'
+ - name: Use staged maven repo settings
+ if: ${{ env.USE_STAGE == 'true' }}
+ run: |
+ cp $HOME/.m2/settings.xml /tmp/settings.xml
+ mv -vf .github/configs/settings.xml $HOME/.m2/settings.xml
+ - name: Compile install
+ run: mvn package -DskipTests=true -ntp
+
+ # TODO: enable it after the check scripts are ready, lack them now
+ #- name: Generate & check current 3rd-party dependencies
+ # run: |
+ # bash $SCRIPT_DEPENDENCY/regenerate_known_dependencies.sh
current-dependencies.txt
+ # bash $SCRIPT_DEPENDENCY/check_dependencies.sh
diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml
index 3ba4f72b..5b5cdc21 100644
--- a/.github/workflows/stale.yml
+++ b/.github/workflows/stale.yml
@@ -13,7 +13,7 @@ jobs:
pull-requests: write
steps:
- - uses: actions/stale@v3
+ - uses: actions/stale@v8
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
stale-issue-message: 'Due to the lack of activity, the current issue
is marked as stale and will be closed after 20 days, any update will remove the
stale label'
diff --git a/checkstyle.xml b/checkstyle.xml
index 96156a36..ef00578c 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -35,7 +35,8 @@
<module name="TreeWalker">
<!--检查行长度-->
<module name="LineLength">
- <property name="max" value="100"/>
+ <!-- Use 101 due to some legacy code reason -->
+ <property name="max" value="101"/>
<!--可以忽略的行-->
<property name="ignorePattern"
value="^package.*|^import.*|a
href|href|http://|https://|ftp://"/>
@@ -84,7 +85,8 @@
<module name="WhitespaceAround"/>
<!--左圆括号之后和右圆括号之前是否需要有一个空格,不需要-->
<module name="ParenPad"/>
-
<!--检查修饰符是否符合Java建议,顺序是:public、protected、private、abstract、default、static、final、transient、volatile、synchronized、native、strictfp-->
+
<!--检查修饰符是否符合Java建议,顺序是:public、protected、private、abstract、default、static、
+ final、transient、volatile、synchronized、native、strictfp-->
<module name="ModifierOrder"/>
<!--检查代码块的左花括号的放置位置,必须在当前行的末尾-->
<module name="LeftCurly">
diff --git
a/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
b/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
index 73658f0a..846ccaac 100644
---
a/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
+++
b/computer-api/src/main/java/org/apache/hugegraph/computer/core/common/Constants.java
@@ -59,7 +59,7 @@ public final class Constants {
public static final int FUTURE_TIMEOUT = 300;
/*
- * The timeout in millisecond for threadpool shutdown
+ * The timeout in millisecond for thread-pool shutdown
*/
public static final long SHUTDOWN_TIMEOUT = 5000L;
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
index c4681be1..ce97cb48 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/bsp/EtcdClient.java
@@ -65,8 +65,7 @@ public class EtcdClient {
"The endpoints can't be null");
E.checkArgumentNotNull(namespace,
"The namespace can't be null");
- ByteSequence namespaceSeq = ByteSequence.from(namespace.getBytes(
- ENCODING));
+ ByteSequence namespaceSeq =
ByteSequence.from(namespace.getBytes(ENCODING));
this.client = Client.builder().endpoints(endpoints)
.namespace(namespaceSeq).build();
this.watch = this.client.getWatchClient();
@@ -80,20 +79,14 @@ public class EtcdClient {
* @param value value to be associated with the specified key
*/
public void put(String key, byte[] value) {
- E.checkArgument(key != null,
- "The key can't be null.");
- E.checkArgument(value != null,
- "The value can't be null.");
+ E.checkArgument(key != null, "The key can't be null.");
+ E.checkArgument(value != null, "The value can't be null.");
try {
- this.kv.put(ByteSequence.from(key, ENCODING),
- ByteSequence.from(value))
- .get();
+ this.kv.put(ByteSequence.from(key, ENCODING),
ByteSequence.from(value)).get();
} catch (InterruptedException e) {
- throw new ComputerException(
- "Interrupted while putting with key='%s'", e, key);
+ throw new ComputerException("Interrupted while putting with
key='%s'", e, key);
} catch (ExecutionException e) {
- throw new ComputerException("Error while putting with key='%s'",
- e, key);
+ throw new ComputerException("Error while putting with key='%s'",
e, key);
}
}
@@ -110,8 +103,8 @@ public class EtcdClient {
* Returns the value to which the specified key is mapped.
* @param key The key to be found
* @param throwException whether to throw ComputerException if not found.
- * @return the value of specified key, null if not found and
- * throwException is set false
+ * @return the value of the specified key, null if not found,
+ * and throwException is set false
* @throws ComputerException if not found and throwException is set true
*/
public byte[] get(String key, boolean throwException) {
@@ -124,24 +117,20 @@ public class EtcdClient {
assert kvs.size() == 1;
return kvs.get(0).getValue().getBytes();
} else if (throwException) {
- throw new ComputerException("Can't find value for key='%s'",
- key);
+ throw new ComputerException("Can't find value for key='%s'",
key);
} else {
return null;
}
} catch (InterruptedException e) {
- throw new ComputerException(
- "Interrupted while getting with key='%s'", e, key);
+ throw new ComputerException("Interrupted while getting with
key='%s'", e, key);
} catch (ExecutionException e) {
- throw new ComputerException("Error while getting with key='%s'",
- e, key);
+ throw new ComputerException("Error while getting with key='%s'",
e, key);
}
}
/**
* Returns the value to which the specified key is mapped. If no
- * key exists, wait at most timeout milliseconds. Or throw
- * ComputerException if timeout
+ * key exists, wait for most time out milliseconds. Or throw
ComputerException if timeout
* @param key the key whose associated value is to be returned.
* @param timeout the max time in milliseconds to wait.
* @return the specified value in byte array to which the specified key is
@@ -149,11 +138,9 @@ public class EtcdClient {
*/
public byte[] get(String key, long timeout, long logInterval) {
E.checkArgumentNotNull(key, "The key can't be null");
- E.checkArgument(timeout > 0L,
- "The timeout must be > 0, but got: %s", timeout);
- E.checkArgument(logInterval > 0L,
- "The logInterval must be > 0, but got: %s",
- logInterval);
+ E.checkArgument(timeout > 0L, "The timeout must be > 0, but got: %s",
timeout);
+ E.checkArgument(logInterval > 0L, "The logInterval must be > 0, but
got: %s", logInterval);
+
ByteSequence keySeq = ByteSequence.from(key, ENCODING);
try {
GetResponse response = this.kv.get(keySeq).get();
@@ -162,16 +149,12 @@ public class EtcdClient {
return kvs.get(0).getValue().getBytes();
} else {
long revision = response.getHeader().getRevision();
- return this.waitAndGetFromPutEvent(keySeq, revision,
- timeout, logInterval);
+ return this.waitAndGetFromPutEvent(keySeq, revision, timeout,
logInterval);
}
} catch (InterruptedException e) {
- throw new ComputerException(
- "Interrupted while getting with key='%s'",
- e, key);
+ throw new ComputerException("Interrupted while getting with
key='%s'", e, key);
} catch (ExecutionException e) {
- throw new ComputerException("Error while getting with key='%s'",
- e, key);
+ throw new ComputerException("Error while getting with key='%s'",
e, key);
}
}
@@ -214,8 +197,7 @@ public class EtcdClient {
.withRevision(revision)
.withNoDelete(true)
.build();
- try (Watch.Watcher watcher = this.watch.watch(keySeq, watchOption,
- consumer)) {
+ try (Watch.Watcher ignored = this.watch.watch(keySeq, watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for key '{}' with timeout {}ms",
keySeq.toString(ENCODING), timeout);
@@ -225,7 +207,7 @@ public class EtcdClient {
/**
* Get the values of keys with the specified prefix.
- * If no key found, return empty list.
+ * If no key is found, return an empty list.
*/
public List<byte[]> getWithPrefix(String prefix) {
E.checkArgumentNotNull(prefix, "The prefix can't be null");
@@ -251,7 +233,7 @@ public class EtcdClient {
/**
* Get the expected count of values of keys with the specified prefix.
- * Throws ComputerException if there are no enough object.
+ * Throws ComputerException if there are no enough objects.
*/
public List<byte[]> getWithPrefix(String prefix, int count) {
E.checkArgumentNotNull(prefix,
@@ -284,12 +266,12 @@ public class EtcdClient {
}
/**
- * Get expected count of values with the key prefix with prefix. If there
- * is no count of keys, wait at most timeout milliseconds.
+ * Get the expected count of values with the key prefix with prefix.
+ * If there is no count of keys, wait at max timeout milliseconds.
* @param prefix the key prefix
- * @param count the expected count of values to be get
+ * @param count the expected count of values to be got
* @param timeout the max wait time
- * @param logInterval the interval in ms to log message
+ * @param logInterval the interval in ms to log a message
* @return the list of values which key with specified prefix
*/
public List<byte[]> getWithPrefix(String prefix, int count,
@@ -329,8 +311,8 @@ public class EtcdClient {
/**
* Wait at most expected eventCount events triggered in timeout ms.
- * This method wait at most timeout ms regardless whether expected
- * eventCount events triggered.
+ * This method waits at most timeout ms regardless of whether
expected-eventCount events
+ * triggered.
* @param existedKeyValues readonly
*/
private List<byte[]> waitAndPrefixGetFromPutEvent(
@@ -368,21 +350,18 @@ public class EtcdClient {
.withPrefix(prefixSeq)
.withRevision(revision)
.build();
- try (Watch.Watcher watcher = this.watch.watch(prefixSeq,
- watchOption,
- consumer)) {
+ try (Watch.Watcher ignored = this.watch.watch(prefixSeq, watchOption,
consumer)) {
return barrierEvent.await(timeout, logInterval, () -> {
LOG.info("Wait for keys with prefix '{}' and timeout {}ms, " +
"expect {} keys but actual got {} keys",
- prefixSeq.toString(ENCODING),
- timeout, count, keyValues.size());
+ prefixSeq.toString(ENCODING), timeout, count,
keyValues.size());
});
}
}
/**
* @return 1 if deleted specified key, 0 if not found specified key
- * The deleted data can be get through revision, if revision is compacted,
+ * The deleted data can be got through revision, if revision is compacted,
* throw exception "etcdserver: mvcc: required revision has been
compacted".
* @see <a href="https://etcd.io/docs/v3.4.0/op-guide/maintenance/">
* Maintenance</a>
@@ -391,12 +370,10 @@ public class EtcdClient {
E.checkArgumentNotNull(key, "The key can't be null");
ByteSequence keySeq = ByteSequence.from(key, ENCODING);
try {
- DeleteResponse response = this.client.getKVClient().delete(keySeq)
- .get();
+ DeleteResponse response =
this.client.getKVClient().delete(keySeq).get();
return response.getDeleted();
} catch (InterruptedException e) {
- throw new ComputerException("Interrupted while deleting '%s'",
- e, key);
+ throw new ComputerException("Interrupted while deleting '%s'", e,
key);
} catch (ExecutionException e) {
throw new ComputerException("Error while deleting '%s'", e, key);
}
@@ -408,12 +385,10 @@ public class EtcdClient {
public long deleteWithPrefix(String prefix) {
E.checkArgumentNotNull(prefix, "The prefix can't be null");
ByteSequence prefixSeq = ByteSequence.from(prefix, ENCODING);
- DeleteOption deleteOption = DeleteOption.newBuilder()
- .withPrefix(prefixSeq).build();
+ DeleteOption deleteOption =
DeleteOption.newBuilder().withPrefix(prefixSeq).build();
try {
DeleteResponse response = this.client.getKVClient()
- .delete(prefixSeq,
- deleteOption)
+ .delete(prefixSeq,
deleteOption)
.get();
return response.getDeleted();
} catch (InterruptedException e) {
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
index 06fb7564..da01fa7b 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
@@ -66,11 +66,10 @@ public class MasterService implements Closeable {
private Config config;
private volatile Bsp4Master bsp4Master;
private ContainerInfo masterInfo;
- private List<ContainerInfo> workers;
private int maxSuperStep;
private MasterComputation masterComputation;
- private volatile ShutdownHook shutdownHook;
+ private final ShutdownHook shutdownHook;
private volatile Thread serviceThread;
public MasterService() {
@@ -101,7 +100,7 @@ public class MasterService implements Closeable {
rpcAddress.getPort());
/*
* Connect to BSP server and clean the old data may be left by the
- * previous job with same job id.
+ * previous job with the same job id.
*/
this.bsp4Master = new Bsp4Master(this.config);
this.bsp4Master.clean();
@@ -114,9 +113,9 @@ public class MasterService implements Closeable {
LOG.info("{} register MasterService", this);
this.bsp4Master.masterInitDone(this.masterInfo);
- this.workers = this.bsp4Master.waitWorkersInitDone();
+ List<ContainerInfo> workers = this.bsp4Master.waitWorkersInitDone();
LOG.info("{} waited all workers registered, workers count: {}",
- this, this.workers.size());
+ this, workers.size());
LOG.info("{} MasterService initialized", this);
this.inited = true;
@@ -141,24 +140,36 @@ public class MasterService implements Closeable {
}
/**
- * Stop the the master service. Stop the managers created in
- * {@link #init(Config)}.
+ * Stop the master service. Stop the managers created in {@link
#init(Config)}.
*/
@Override
public synchronized void close() {
- this.checkInited();
+ // TODO: check the logic of close carefully later
+ //this.checkInited();
if (this.closed) {
LOG.info("{} MasterService had closed before", this);
return;
}
- this.masterComputation.close(new DefaultMasterContext());
+ try {
+ if (this.masterComputation != null) {
+ this.masterComputation.close(new DefaultMasterContext());
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred while closing master service", e);
+ }
- if (!failed) {
+ if (!failed && this.bsp4Master != null) {
this.bsp4Master.waitWorkersCloseDone();
}
- this.managers.closeAll(this.config);
+ try {
+ if (managers != null) {
+ this.managers.closeAll(this.config);
+ }
+ } catch (Exception e) {
+ LOG.error("Error occurred while closing managers", e);
+ }
this.cleanAndCloseBsp();
this.shutdownHook.unhook();
@@ -333,7 +344,7 @@ public class MasterService implements Closeable {
* 1): Has run maxSuperStep times of superstep iteration.
* 2): The mater-computation returns false that stop superstep iteration.
* 3): All vertices are inactive and no message sent in a superstep.
- * @param masterContinue The master-computation decide
+ * @param masterContinue The master-computation decides
* @return true if finish superstep iteration.
*/
private boolean finishedIteration(boolean masterContinue,
@@ -351,7 +362,7 @@ public class MasterService implements Closeable {
/**
* Coordinate with workers to load vertices and edges from HugeGraph. There
- * are two phases in inputstep. First phase is get input splits from
+ * are two phases in inputstep. The First phase is to get input splits from
* master, and read the vertices and edges from input splits. The second
* phase is after all workers read input splits, the workers merge the
* vertices and edges to get the stats for each partition.
@@ -371,8 +382,7 @@ public class MasterService implements Closeable {
}
/**
- * Wait the workers write result back. After this, the job is finished
- * successfully.
+ * Wait the workers write a result back. After this, the job is finished
successfully.
*/
private void outputstep() {
LOG.info("{} MasterService outputstep started", this);
@@ -400,7 +410,7 @@ public class MasterService implements Closeable {
"The aggregator class can't be null");
Aggregator<V> aggr;
try {
- aggr = aggregatorClass.newInstance();
+ aggr = aggregatorClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new ComputerException("Can't new instance from class:
%s",
e, aggregatorClass.getName());
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
index 6f904d88..44732ad4 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/session/ClientSession.java
@@ -133,11 +133,9 @@ public class ClientSession extends TransportSession {
} catch (Throwable e) {
this.stateEstablished();
if (e instanceof TimeoutException) {
- throw new TransportException(
- "Timeout(%sms) to wait finish-response", timeout);
+ throw new TransportException("Timeout(%sms) to wait
finish-response", timeout);
} else {
- throw new TransportException("Failed to wait finish-response",
- e);
+ throw new TransportException("Failed to wait finish-response",
e);
}
} finally {
this.finishedFutureRef.compareAndSet(finishFuture, null);
@@ -150,13 +148,11 @@ public class ClientSession extends TransportSession {
"at finishAsync()", this.state);
CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
- boolean success = this.finishedFutureRef.compareAndSet(null,
- finishedFuture);
+ boolean success = this.finishedFutureRef.compareAndSet(null,
finishedFuture);
E.checkArgument(success, "The finishedFutureRef value must be null " +
"at finishAsync()");
int finishId = this.genFinishId();
-
this.stateFinishSent(finishId);
try {
FinishMessage finishMessage = new FinishMessage(finishId);
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
index fc5b5dc5..8d776b32 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
@@ -61,22 +61,20 @@ public class WorkerService implements Closeable {
private static final Logger LOG = Log.logger(WorkerService.class);
+ private volatile boolean inited;
+ private volatile boolean closed;
+
private final ComputerContext context;
- private final Managers managers;
private final Map<Integer, ContainerInfo> workers;
+ private final Managers managers;
+ private final ShutdownHook shutdownHook;
- private volatile boolean inited;
- private volatile boolean closed;
- private Config config;
private Bsp4Worker bsp4Worker;
+ private Config config;
private ComputeManager computeManager;
private ContainerInfo workerInfo;
-
private Combiner<Value> combiner;
- private ContainerInfo masterInfo;
-
- private volatile ShutdownHook shutdownHook;
private volatile Thread serviceThread;
public WorkerService() {
@@ -91,57 +89,67 @@ public class WorkerService implements Closeable {
/**
* Init worker service, create the managers used by worker service.
*/
- public void init(Config config) {
- E.checkArgument(!this.inited, "The %s has been initialized", this);
-
- this.serviceThread = Thread.currentThread();
- this.registerShutdownHook();
-
- this.config = config;
-
- this.workerInfo = new ContainerInfo();
- LOG.info("{} Start to initialize worker", this);
-
- this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
+ public synchronized void init(Config config) {
+ try {
+ LOG.info("{} Prepare to init WorkerService", this);
+ // TODO: what will happen if init() called by multiple threads?
+ E.checkArgument(!this.inited, "The %s has been initialized", this);
- /*
- * Keep the waitMasterInitDone() called before initManagers(),
- * in order to ensure master init() before worker managers init()
- */
- this.masterInfo = this.bsp4Worker.waitMasterInitDone();
+ this.serviceThread = Thread.currentThread();
+ this.registerShutdownHook();
+ this.config = config;
+ this.workerInfo = new ContainerInfo();
- InetSocketAddress address = this.initManagers(this.masterInfo);
- this.workerInfo.updateAddress(address);
+ LOG.info("{} Start to initialize worker", this);
+ this.bsp4Worker = new Bsp4Worker(this.config, this.workerInfo);
+ /*
+ * Keep the waitMasterInitDone() called before initManagers(),
+ * in order to ensure master init() before worker managers init()
+ */
+ ContainerInfo masterInfo = this.bsp4Worker.waitMasterInitDone();
+ InetSocketAddress address = this.initManagers(masterInfo);
+ this.workerInfo.updateAddress(address);
+ this.loadComputation();
+
+ LOG.info("{} register WorkerService", this);
+ this.bsp4Worker.workerInitDone();
+ this.connectToWorkers();
+
+ this.computeManager = new ComputeManager(this.workerInfo.id(),
this.context,
+ this.managers);
+
+ this.managers.initedAll(this.config);
+ LOG.info("{} WorkerService initialized", this);
+ this.inited = true;
+ } catch (Exception e) {
+ LOG.error("Error while initializing WorkerService", e);
+ // TODO: shall we call close() here?
+ throw e;
+ }
+ }
+ private void loadComputation() {
Computation<?> computation = this.config.createObject(
- ComputerOptions.WORKER_COMPUTATION_CLASS);
+ ComputerOptions.WORKER_COMPUTATION_CLASS);
LOG.info("Loading computation '{}' in category '{}'",
computation.name(), computation.category());
- this.combiner = this.config.createObject(
- ComputerOptions.WORKER_COMBINER_CLASS, false);
+ this.combiner =
this.config.createObject(ComputerOptions.WORKER_COMBINER_CLASS, false);
if (this.combiner == null) {
- LOG.info("None combiner is provided for computation '{}'",
- computation.name());
+ LOG.info("None combiner is provided for computation '{}'",
computation.name());
} else {
LOG.info("Combiner '{}' is provided for computation '{}'",
this.combiner.name(), computation.name());
}
+ }
- LOG.info("{} register WorkerService", this);
- this.bsp4Worker.workerInitDone();
+ private void connectToWorkers() {
List<ContainerInfo> workers = this.bsp4Worker.waitMasterAllInitDone();
DataClientManager dm = this.managers.get(DataClientManager.NAME);
for (ContainerInfo worker : workers) {
this.workers.put(worker.id(), worker);
dm.connect(worker.id(), worker.hostname(), worker.dataPort());
}
-
- this.computeManager = new ComputeManager(this.workerInfo.id(),
this.context, this.managers);
-
- this.managers.initedAll(this.config);
- LOG.info("{} WorkerService initialized", this);
- this.inited = true;
}
private void registerShutdownHook() {
@@ -152,29 +160,50 @@ public class WorkerService implements Closeable {
}
/**
- * Stop the worker service. Stop the managers created in
- * {@link #init(Config)}.
+ * Stop the worker service. Stop the managers created in {@link
#init(Config)}.
*/
@Override
public synchronized void close() {
- this.checkInited();
+ // TODO: why checkInited() here, if init throws exception, how to
close the resource?
+ //this.checkInited();
if (this.closed) {
LOG.info("{} WorkerService had closed before", this);
return;
}
- this.computeManager.close();
+ try {
+ if (this.computeManager != null) {
+ this.computeManager.close();
+ } else {
+ LOG.warn("The computeManager is null");
+ return;
+ }
+ } catch (Exception e) {
+ LOG.error("Error when closing ComputeManager", e);
+ }
/*
* Seems managers.closeAll() would do the following actions:
* TODO: close the connection to other workers.
* TODO: stop the connection to the master
* TODO: stop the data transportation server.
*/
- this.managers.closeAll(this.config);
+ try {
+ this.managers.closeAll(this.config);
+ } catch (Exception e) {
+ LOG.error("Error while closing managers", e);
+ }
- this.bsp4Worker.workerCloseDone();
- this.bsp4Worker.close();
- this.shutdownHook.unhook();
+ try {
+ this.bsp4Worker.workerCloseDone();
+ this.bsp4Worker.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing bsp4Worker", e);
+ }
+ try {
+ this.shutdownHook.unhook();
+ } catch (Exception e) {
+ LOG.error("Error while unhooking shutdownHook", e);
+ }
this.closed = true;
LOG.info("{} WorkerService closed", this);
@@ -201,14 +230,13 @@ public class WorkerService implements Closeable {
}
/**
- * Execute the superstep in worker. It first wait master witch superstep
+ * Execute the superstep in worker. It first waits master witch superstep
* to start from. And then do the superstep iteration until master's
* superstepStat is inactive.
*/
public void execute() {
- this.checkInited();
-
LOG.info("{} WorkerService execute", this);
+ this.checkInited();
// TODO: determine superstep if fail over is enabled.
int superstep = this.bsp4Worker.waitMasterResumeDone();
@@ -227,8 +255,7 @@ public class WorkerService implements Closeable {
* superstep.
*/
while (superstepStat.active()) {
- WorkerContext context = new SuperstepContext(superstep,
- superstepStat);
+ WorkerContext context = new SuperstepContext(superstep,
superstepStat);
LOG.info("Start computation of superstep {}", superstep);
if (superstep > 0) {
this.computeManager.takeRecvedMessages();
@@ -242,13 +269,12 @@ public class WorkerService implements Closeable {
this.managers.beforeSuperstep(this.config, superstep);
/*
- * Notify master by each worker, when the master received all
+ * Notify the master by each worker, when the master received all
* workers signal, then notify all workers to do compute().
*/
this.bsp4Worker.workerStepPrepareDone(superstep);
this.bsp4Worker.waitMasterStepPrepareDone(superstep);
- WorkerStat workerStat = this.computeManager.compute(context,
- superstep);
+ WorkerStat workerStat = this.computeManager.compute(context,
superstep);
this.bsp4Worker.workerStepComputeDone(superstep);
this.bsp4Worker.waitMasterStepComputeDone(superstep);
@@ -274,8 +300,7 @@ public class WorkerService implements Closeable {
@Override
public String toString() {
- Object id = this.workerInfo == null ?
- "?" + this.hashCode() : this.workerInfo.id();
+ Object id = this.workerInfo == null ? "?" + this.hashCode() :
this.workerInfo.id();
return String.format("[worker %s]", id);
}
@@ -287,13 +312,11 @@ public class WorkerService implements Closeable {
* NOTE: this init() method will be called twice, will be ignored at
* the 2nd time call.
*/
- WorkerRpcManager.updateRpcRemoteServerConfig(this.config,
- masterInfo.hostname(),
+ WorkerRpcManager.updateRpcRemoteServerConfig(this.config,
masterInfo.hostname(),
masterInfo.rpcPort());
rpcManager.init(this.config);
- WorkerAggrManager aggregatorManager = new WorkerAggrManager(
- this.context);
+ WorkerAggrManager aggregatorManager = new
WorkerAggrManager(this.context);
aggregatorManager.service(rpcManager.aggregateRpcService());
this.managers.add(aggregatorManager);
FileManager fileManager = new FileManager();
@@ -307,30 +330,22 @@ public class WorkerService implements Closeable {
this.managers.add(recvManager);
ConnectionManager connManager = new TransportConnectionManager();
- DataServerManager serverManager = new DataServerManager(connManager,
- recvManager);
+ DataServerManager serverManager = new DataServerManager(connManager,
recvManager);
this.managers.add(serverManager);
- DataClientManager clientManager = new DataClientManager(connManager,
- this.context);
+ DataClientManager clientManager = new DataClientManager(connManager,
this.context);
this.managers.add(clientManager);
SortManager sendSortManager = new SendSortManager(this.context);
this.managers.add(sendSortManager);
- MessageSendManager sendManager = new MessageSendManager(this.context,
- sendSortManager,
- clientManager.sender());
+ MessageSendManager sendManager = new MessageSendManager(this.context,
sendSortManager,
+
clientManager.sender());
this.managers.add(sendManager);
-
- SnapshotManager snapshotManager = new SnapshotManager(this.context,
- sendManager,
- recvManager,
- this.workerInfo);
+ SnapshotManager snapshotManager = new SnapshotManager(this.context,
sendManager,
+ recvManager,
this.workerInfo);
this.managers.add(snapshotManager);
-
- WorkerInputManager inputManager = new WorkerInputManager(this.context,
- sendManager,
+ WorkerInputManager inputManager = new WorkerInputManager(this.context,
sendManager,
snapshotManager);
inputManager.service(rpcManager.inputSplitService());
this.managers.add(inputManager);
@@ -339,8 +354,8 @@ public class WorkerService implements Closeable {
this.managers.initAll(this.config);
InetSocketAddress address = serverManager.address();
- LOG.info("{} WorkerService initialized managers with data server " +
- "address '{}'", this, address);
+ LOG.info("{} WorkerService initialized managers with data server
address '{}'",
+ this, address);
return address;
}
@@ -350,7 +365,7 @@ public class WorkerService implements Closeable {
/**
* Load vertices and edges from HugeGraph. There are two phases in
- * inputstep. First phase is get input splits from master, and read the
+ * inputstep. The First phase is to get input splits from master, and read
the
* vertices and edges from input splits. The second phase is after all
* workers read input splits, the workers merge the vertices and edges to
* get the stats for each partition.
@@ -365,10 +380,8 @@ public class WorkerService implements Closeable {
WorkerStat workerStat = this.computeManager.input();
- this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP,
- workerStat);
- SuperstepStat superstepStat = this.bsp4Worker.waitMasterStepDone(
- Constants.INPUT_SUPERSTEP);
+ this.bsp4Worker.workerStepDone(Constants.INPUT_SUPERSTEP, workerStat);
+ SuperstepStat superstepStat =
this.bsp4Worker.waitMasterStepDone(Constants.INPUT_SUPERSTEP);
manager.close(this.config);
LOG.info("{} WorkerService inputstep finished", this);
return superstepStat;
@@ -395,10 +408,8 @@ public class WorkerService implements Closeable {
private SuperstepContext(int superstep, SuperstepStat superstepStat) {
this.superstep = superstep;
this.superstepStat = superstepStat;
- this.aggrManager = WorkerService.this.managers.get(
- WorkerAggrManager.NAME);
- this.sendManager = WorkerService.this.managers.get(
- MessageSendManager.NAME);
+ this.aggrManager =
WorkerService.this.managers.get(WorkerAggrManager.NAME);
+ this.sendManager =
WorkerService.this.managers.get(MessageSendManager.NAME);
}
@Override
diff --git a/computer-dist/src/assembly/dataset/struct.json
b/computer-dist/src/assembly/dataset/struct.json
index ed571ca4..c1d8be1e 100644
--- a/computer-dist/src/assembly/dataset/struct.json
+++ b/computer-dist/src/assembly/dataset/struct.json
@@ -6,7 +6,7 @@
"skip": false,
"input": {
"type": "FILE",
- "path":
"computer-dist/src/assembly/dataset/ml-latest-small/ratings.csv",
+ "path": "/dataset/ml-latest-small/ratings.csv",
"file_filter": {
"extensions": [
"*"
@@ -83,7 +83,7 @@
"skip": false,
"input": {
"type": "FILE",
- "path": "computer-dist/src/assembly/dataset/ml-latest-small/tags.csv",
+ "path": "/dataset/ml-latest-small/tags.csv",
"file_filter": {
"extensions": [
"*"
@@ -160,7 +160,7 @@
"skip": false,
"input": {
"type": "FILE",
- "path":
"computer-dist/src/assembly/dataset/ml-latest-small/movies.csv",
+ "path": "/dataset/ml-latest-small/movies.csv",
"file_filter": {
"extensions": [
"*"
diff --git a/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
b/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
index af5281de..b5365caa 100755
--- a/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
+++ b/computer-dist/src/assembly/travis/install-hugegraph-from-source.sh
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+
+# Note: this script is not used in github-ci now, keep it for other env
set -ev
if [[ $# -ne 1 ]]; then
@@ -40,4 +42,5 @@ chmod -R 755 bin/
bin/init-store.sh || exit 1
bin/start-hugegraph.sh || cat logs/hugegraph-server.log
+
cd ../
diff --git a/computer-dist/src/assembly/travis/install-k8s.sh
b/computer-dist/src/assembly/travis/install-k8s.sh
index 4619fa91..d18f4928 100755
--- a/computer-dist/src/assembly/travis/install-k8s.sh
+++ b/computer-dist/src/assembly/travis/install-k8s.sh
@@ -17,6 +17,7 @@
#
set -ev
+# TODO: could replace by docker way
curl -Lo minikube
https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64 &&
chmod +x minikube
sudo mkdir -p /usr/local/bin/
sudo install minikube /usr/local/bin/
diff --git a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
index e00890b6..b448b661 100755
--- a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
+++ b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
@@ -21,27 +21,43 @@ set -ev
TRAVIS_DIR=$(dirname "$0")
DATASET_DIR=${TRAVIS_DIR}/../dataset
-HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git"
-
-git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
-
-cd hugegraph-toolchain
-mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
-
-cd hugegraph-loader
-tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
-cd ../../
+docker network create ci
+# Note: we need wait for server start finished, so start it first
+docker run -itd --name=graph --network ci -p 8080:8080
hugegraph/hugegraph:latest && sleep 6
wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
unzip -d ${DATASET_DIR} ml-latest-small.zip
-hugegraph-toolchain/hugegraph-loader/apache-hugegraph-loader-*/bin/hugegraph-loader.sh
\
--g hugegraph -f ${DATASET_DIR}/struct.json -s ${DATASET_DIR}/schema.groovy ||
exit 1
+cd ${DATASET_DIR}/.. && pwd && ls -lh *
+
+docker run -id --name=loader --network ci hugegraph/loader:latest
+docker cp dataset loader:/dataset || exit 1
+
+docker exec -i loader ls -lh /dataset
+docker exec -i loader bin/hugegraph-loader.sh -g hugegraph -p 8080 -h graph \
+ -f /dataset/struct.json -s /dataset/schema.groovy || exit 1
# load dataset to hdfs
-sort -t , -k1n -u "${DATASET_DIR}"/ml-latest-small/ratings.csv | cut -d "," -f
1 > "${DATASET_DIR}"/ml-latest-small/user_id.csv || exit 1
+sort -t , -k1n -u dataset/ml-latest-small/ratings.csv | cut -d "," -f 1
>dataset/ml-latest-small/user_id.csv || exit 1
/opt/hadoop/bin/hadoop fs -mkdir -p /dataset/ml-latest-small || exit 1
-/opt/hadoop/bin/hadoop fs -put "${DATASET_DIR}"/ml-latest-small/*
/dataset/ml-latest-small || exit 1
+/opt/hadoop/bin/hadoop fs -put dataset/ml-latest-small/*
/dataset/ml-latest-small || exit 1
/opt/hadoop/bin/hadoop fs -ls /dataset/ml-latest-small
echo "Load finished, continue to next step"
+
+############# Note: this part is not used in github-ci now, backup it for
other env ##############
+#HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git"
+#git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
+#
+#cd hugegraph-toolchain
+#mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests
-ntp
+#
+#cd hugegraph-loader
+#tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
+#cd ../../
+
+#wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
+#unzip -d ${DATASET_DIR} ml-latest-small.zip
+
+#hugegraph-toolchain/hugegraph-loader/apache-hugegraph-loader-*/bin/hugegraph-loader.sh
\
+# -g hugegraph -f ${DATASET_DIR}/struct.json -s
${DATASET_DIR}/schema.groovy || exit 1
diff --git a/computer-dist/src/assembly/travis/start-etcd.sh
b/computer-dist/src/assembly/travis/start-etcd.sh
index 896d0832..fe3893ac 100644
--- a/computer-dist/src/assembly/travis/start-etcd.sh
+++ b/computer-dist/src/assembly/travis/start-etcd.sh
@@ -17,8 +17,9 @@
#
set -ev
-TRAVIS_DIR=`dirname $0`
+TRAVIS_DIR=$(dirname $0)
echo "Starting etcd..."
+# TODO: replace with docker way
wget -O ${TRAVIS_DIR}/etcd.tar.gz
https://github.com/etcd-io/etcd/releases/download/v3.5.0/etcd-v3.5.0-linux-amd64.tar.gz
mkdir ${TRAVIS_DIR}/etcd
tar -zxvf ${TRAVIS_DIR}/etcd.tar.gz -C ${TRAVIS_DIR}/etcd --strip-components 1
diff --git
a/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
b/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
index 9934f513..c440551a 100644
---
a/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
+++
b/computer-k8s-operator/src/main/java/org/apache/hugegraph/computer/k8s/operator/OperatorEntrypoint.java
@@ -19,6 +19,7 @@ package org.apache.hugegraph.computer.k8s.operator;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -36,7 +37,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.configuration2.MapConfiguration;
import org.apache.commons.lang3.StringUtils;
-import org.apache.http.HttpStatus;
import org.apache.hugegraph.computer.k8s.Constants;
import org.apache.hugegraph.computer.k8s.operator.common.AbstractController;
import org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions;
@@ -61,6 +61,11 @@ import
io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.utils.Utils;
+/**
+ * The OperatorEntrypoint class is the main entry point for the Kubernetes
operator.
+ * It sets up the Kubernetes client, registers controllers, and starts the
HTTP server for
+ * health checks.
+ */
public class OperatorEntrypoint {
private static final Logger LOG = Log.logger(OperatorEntrypoint.class);
@@ -74,15 +79,13 @@ public class OperatorEntrypoint {
public static void main(String[] args) {
OperatorEntrypoint operatorEntrypoint = new OperatorEntrypoint();
- Runtime.getRuntime().addShutdownHook(
- new Thread(operatorEntrypoint::shutdown));
+ Runtime.getRuntime().addShutdownHook(new
Thread(operatorEntrypoint::shutdown));
operatorEntrypoint.start();
}
static {
- OptionSpace.register(
- "computer-k8s-operator",
-
"org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions"
+ OptionSpace.register("computer-k8s-operator",
+
"org.apache.hugegraph.computer.k8s.operator.config.OperatorOptions"
);
}
@@ -98,8 +101,7 @@ public class OperatorEntrypoint {
public void start() {
try {
this.kubeClient = new DefaultKubernetesClient();
- String watchNamespace = this.config.get(
- OperatorOptions.WATCH_NAMESPACE);
+ String watchNamespace =
this.config.get(OperatorOptions.WATCH_NAMESPACE);
if (!Objects.equals(watchNamespace, Constants.ALL_NAMESPACE)) {
this.createNamespace(watchNamespace);
this.kubeClient = this.kubeClient.inNamespace(watchNamespace);
@@ -108,19 +110,17 @@ public class OperatorEntrypoint {
LOG.info("Watch namespace: " + watchNamespace);
this.addHealthCheck();
-
this.registerControllers();
this.informerFactory.startAllRegisteredInformers();
this.informerFactory.addSharedInformerEventListener(exception -> {
- LOG.error("Informer event listener exception occurred",
- exception);
+ LOG.error("Informer event listener exception occurred",
exception);
OperatorEntrypoint.this.shutdown();
});
- // Start all controller
- this.controllerPool = ExecutorUtil.newFixedThreadPool(
- this.controllers.size(), "controllers-%d");
+ // Start all controllers
+ this.controllerPool =
ExecutorUtil.newFixedThreadPool(this.controllers.size(),
+
"controllers-%d");
CountDownLatch latch = new CountDownLatch(this.controllers.size());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (AbstractController<?> controller : this.controllers) {
@@ -141,8 +141,7 @@ public class OperatorEntrypoint {
}
});
- CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}))
- .get();
+ CompletableFuture.anyOf(futures.toArray(new
CompletableFuture[]{})).get();
} catch (Throwable throwable) {
LOG.error("Failed to start Operator: ", throwable);
} finally {
@@ -201,16 +200,14 @@ public class OperatorEntrypoint {
}
private void registerControllers() {
- ComputerJobController jobController = new ComputerJobController(
- this.config, this.kubeClient);
- this.registerController(jobController,
- ConfigMap.class, Job.class, Pod.class);
+ ComputerJobController jobController = new
ComputerJobController(this.config,
+
this.kubeClient);
+ this.registerController(jobController, ConfigMap.class, Job.class,
Pod.class);
}
@SafeVarargs
- private final void registerController(
- AbstractController<?> controller,
- Class<? extends HasMetadata>... ownsClass) {
+ private void registerController(AbstractController<?> controller,
+ Class<? extends HasMetadata>... ownsClass)
{
controller.register(this.informerFactory, ownsClass);
this.controllers.add(controller);
}
@@ -222,7 +219,7 @@ public class OperatorEntrypoint {
this.httpServer = HttpServer.create(address, probeBacklog);
this.httpServer.createContext("/health", httpExchange -> {
byte[] bytes = "ALL GOOD!".getBytes(StandardCharsets.UTF_8);
- httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
+ httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK,
bytes.length);
OutputStream responseBody = httpExchange.getResponseBody();
responseBody.write(bytes);
responseBody.close();
@@ -233,7 +230,7 @@ public class OperatorEntrypoint {
private void addReadyCheck() {
this.httpServer.createContext("/ready", httpExchange -> {
byte[] bytes = "ALL Ready!".getBytes(StandardCharsets.UTF_8);
- httpExchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
+ httpExchange.sendResponseHeaders(HttpURLConnection.HTTP_OK,
bytes.length);
OutputStream responseBody = httpExchange.getResponseBody();
responseBody.write(bytes);
responseBody.close();
@@ -245,8 +242,7 @@ public class OperatorEntrypoint {
.withName(namespace)
.endMetadata();
KubeUtil.ignoreExists(() -> {
- return this.kubeClient.namespaces()
- .create(builder.build());
+ return this.kubeClient.namespaces().create(builder.build());
});
}
}
diff --git a/computer-test/pom.xml b/computer-test/pom.xml
index 4b55da32..16f17a7f 100644
--- a/computer-test/pom.xml
+++ b/computer-test/pom.xml
@@ -38,6 +38,12 @@
<dependency>
<groupId>org.apache.hugegraph</groupId>
<artifactId>hugegraph-common</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hugegraph</groupId>
@@ -96,11 +102,37 @@
<version>3.8.0</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>4.12.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>junit</artifactId>
+ <groupId>junit</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>kotlin-stdlib</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-server-mock</artifactId>
<version>5.6.0</version>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>mockwebserver</artifactId>
+ <groupId>com.squareup.okhttp3</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-databind</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
diff --git
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
index acf48bab..6d132a83 100644
---
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
+++
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/network/netty/NettyTransportClientTest.java
@@ -53,18 +53,12 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
@Override
protected void initOption() {
- super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS,
- 8);
- super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS,
- 6);
- super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK,
- 64 * (int) Bytes.MB);
- super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK,
- 32 * (int) Bytes.MB);
- super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL,
- 200L);
- super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT,
- 30_000L);
+ super.updateOption(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS, 8);
+ super.updateOption(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS, 6);
+ super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK,
64 * (int) Bytes.MB);
+ super.updateOption(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK, 32
* (int) Bytes.MB);
+ super.updateOption(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL, 200L);
+ super.updateOption(ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT,
30_000L);
}
@Test
@@ -124,12 +118,9 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
for (int i = 0; i < 3; i++) {
client.startSession();
- client.send(MessageType.MSG, 1,
- ByteBuffer.wrap(StringEncoding.encode("test1")));
- client.send(MessageType.VERTEX, 2,
- ByteBuffer.wrap(StringEncoding.encode("test2")));
- client.send(MessageType.EDGE, 3,
- ByteBuffer.wrap(StringEncoding.encode("test3")));
+ client.send(MessageType.MSG, 1,
ByteBuffer.wrap(StringEncoding.encode("test1")));
+ client.send(MessageType.VERTEX, 2,
ByteBuffer.wrap(StringEncoding.encode("test2")));
+ client.send(MessageType.EDGE, 3,
ByteBuffer.wrap(StringEncoding.encode("test3")));
client.finishSession();
}
}
@@ -172,8 +163,7 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
Assert.assertNotSame(sourceBytes, bytes);
return null;
- }).when(serverHandler).handle(Mockito.any(), Mockito.eq(1),
- Mockito.any());
+ }).when(serverHandler).handle(Mockito.any(), Mockito.eq(1),
Mockito.any());
client.startSession();
client.send(MessageType.MSG, 1, ByteBuffer.wrap(sourceBytes1));
@@ -187,35 +177,25 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
Function<Message, ChannelFuture> sendFunc = message -> null;
- Whitebox.setInternalState(client.clientSession(),
- "sendFunction", sendFunc);
+ Whitebox.setInternalState(client.clientSession(), "sendFunction",
sendFunc);
- Assert.assertThrows(TransportException.class, () -> {
- client.startSession();
- }, e -> {
- Assert.assertContains("to wait start-response",
- e.getMessage());
+ Assert.assertThrows(TransportException.class, client::startSession, e
-> {
+ Assert.assertContains("to wait start-response", e.getMessage());
});
}
@Test
public void testFinishSessionWithTimeout() throws IOException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
-
client.startSession();
Function<Message, ChannelFuture> sendFunc = message -> null;
- Whitebox.setInternalState(client.clientSession(),
- "sendFunction", sendFunc);
+ Whitebox.setInternalState(client.clientSession(), "sendFunction",
sendFunc);
- Whitebox.setInternalState(client, "timeoutFinishSession",
- 1000L);
+ Whitebox.setInternalState(client, "timeoutFinishSession", 1000L);
- Assert.assertThrows(TransportException.class, () -> {
- client.finishSession();
- }, e -> {
- Assert.assertContains("to wait finish-response",
- e.getMessage());
+ Assert.assertThrows(TransportException.class, client::finishSession, e
-> {
+ Assert.assertContains("to wait finish-response", e.getMessage());
});
}
@@ -224,18 +204,14 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
@SuppressWarnings("unchecked")
- Function<Message, ChannelFuture> sendFunc =
- Mockito.mock(Function.class);
- Whitebox.setInternalState(client.clientSession(),
- "sendFunction", sendFunc);
+ Function<Message, ChannelFuture> sendFunc =
Mockito.mock(Function.class);
+ Whitebox.setInternalState(client.clientSession(), "sendFunction",
sendFunc);
Mockito.doThrow(new RuntimeException("test exception"))
.when(sendFunc)
.apply(Mockito.any());
- Assert.assertThrows(RuntimeException.class, () -> {
- client.startSession();
- }, e -> {
+ Assert.assertThrows(RuntimeException.class, client::startSession, e ->
{
Assert.assertContains("test exception", e.getMessage());
});
}
@@ -243,38 +219,31 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
@Test
public void testFinishSessionWithSendException() throws IOException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
-
client.startSession();
@SuppressWarnings("unchecked")
Function<Message, Future<Void>> sendFunc =
Mockito.mock(Function.class);
- Whitebox.setInternalState(client.clientSession(),
- "sendFunction", sendFunc);
+ Whitebox.setInternalState(client.clientSession(), "sendFunction",
sendFunc);
Mockito.doThrow(new RuntimeException("test exception"))
.when(sendFunc)
.apply(Mockito.any());
- Assert.assertThrows(RuntimeException.class, () -> {
- client.finishSession();
- }, e -> {
+ Assert.assertThrows(RuntimeException.class, client::finishSession, e
-> {
Assert.assertContains("test exception", e.getMessage());
});
}
@Test
public void testFlowControl() throws IOException {
- ByteBuffer buffer = ByteBuffer.wrap(
- StringEncoding.encode("test data"));
+ ByteBuffer buffer = ByteBuffer.wrap(StringEncoding.encode("test
data"));
NettyTransportClient client = (NettyTransportClient) this.oneClient();
client.startSession();
- Object sendFuncBak = Whitebox.getInternalState(client.clientSession(),
- "sendFunction");
+ Object sendFuncBak = Whitebox.getInternalState(client.clientSession(),
"sendFunction");
Function<Message, ChannelFuture> sendFunc = message -> null;
- Whitebox.setInternalState(client.clientSession(),
- "sendFunction", sendFunc);
+ Whitebox.setInternalState(client.clientSession(), "sendFunction",
sendFunc);
for (int i = 1; i <= conf.maxPendingRequests() * 2; i++) {
boolean send = client.send(MessageType.MSG, 1, buffer);
@@ -285,10 +254,8 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
}
}
- int maxRequestId = Whitebox.getInternalState(client.clientSession(),
- "maxRequestId");
- int maxAckId = Whitebox.getInternalState(client.clientSession(),
- "maxAckId");
+ int maxRequestId = Whitebox.getInternalState(client.clientSession(),
"maxRequestId");
+ int maxAckId = Whitebox.getInternalState(client.clientSession(),
"maxAckId");
Assert.assertEquals(conf.maxPendingRequests(), maxRequestId);
Assert.assertEquals(0, maxAckId);
@@ -299,18 +266,15 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
}
Assert.assertTrue(client.checkSendAvailable());
- maxAckId = Whitebox.getInternalState(client.clientSession(),
- "maxAckId");
+ maxAckId = Whitebox.getInternalState(client.clientSession(),
"maxAckId");
Assert.assertEquals(pendings + 1, maxAckId);
- Whitebox.setInternalState(client.clientSession(), "sendFunction",
- sendFuncBak);
+ Whitebox.setInternalState(client.clientSession(), "sendFunction",
sendFuncBak);
}
@Test
public void testHandlerException() throws IOException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
-
client.startSession();
Mockito.doThrow(new RuntimeException("test
exception")).when(serverHandler)
@@ -320,14 +284,10 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
boolean send = client.send(MessageType.MSG, 1, buffer);
Assert.assertTrue(send);
- Whitebox.setInternalState(client, "timeoutFinishSession",
- 1000L);
+ Whitebox.setInternalState(client, "timeoutFinishSession", 1000L);
- Assert.assertThrows(TransportException.class, () -> {
- client.finishSession();
- }, e -> {
- Assert.assertContains("to wait finish-response",
- e.getMessage());
+ Assert.assertThrows(TransportException.class, client::finishSession, e
-> {
+ Assert.assertContains("finish-response", e.getMessage());
});
Mockito.verify(serverHandler, Mockito.timeout(10_000L).times(1))
@@ -344,13 +304,11 @@ public class NettyTransportClientTest extends
AbstractNetworkTest {
TransportConf conf = TransportConf.wrapConfig(config);
- Assert.assertThrows(IllegalArgumentException.class,
- conf::minPendingRequests);
+ Assert.assertThrows(IllegalArgumentException.class,
conf::minPendingRequests);
}
@Test
- public void testSessionActive() throws IOException, InterruptedException,
- ExecutionException,
+ public void testSessionActive() throws IOException, InterruptedException,
ExecutionException,
TimeoutException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();
diff --git
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
index b423d386..bba8b9e5 100644
---
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
+++
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManagerTest.java
@@ -72,17 +72,10 @@ public class MessageRecvManagerTest extends UnitTestBase {
this.fileManager.init(this.config);
this.sortManager = new RecvSortManager(context());
this.sortManager.init(this.config);
- this.receiveManager = new MessageRecvManager(context(),
- this.fileManager,
- this.sortManager);
- this.snapshotManager = new SnapshotManager(context(),
- null,
- receiveManager,
- null);
+ this.receiveManager = new MessageRecvManager(context(),
this.fileManager, this.sortManager);
+ this.snapshotManager = new SnapshotManager(context(), null,
receiveManager, null);
this.receiveManager.init(this.config);
- this.connectionId = new ConnectionId(
- new InetSocketAddress("localhost",8081),
- 0);
+ this.connectionId = new ConnectionId(new
InetSocketAddress("localhost", 8081), 0);
}
@After
@@ -94,31 +87,28 @@ public class MessageRecvManagerTest extends UnitTestBase {
@Test
public void testVertexAndEdgeMessage() throws IOException {
- // Send vertex message
+ // Send vertex messages
this.receiveManager.onStarted(this.connectionId);
this.receiveManager.onFinished(this.connectionId);
- VertexMessageRecvPartitionTest.addTenVertexBuffer(
- (NetworkBuffer buffer) -> {
+ VertexMessageRecvPartitionTest.addTenVertexBuffer((NetworkBuffer
buffer) -> {
this.receiveManager.handle(MessageType.VERTEX, 0, buffer);
});
- EdgeMessageRecvPartitionTest.addTenEdgeBuffer(
- (NetworkBuffer buffer) -> {
+ EdgeMessageRecvPartitionTest.addTenEdgeBuffer((NetworkBuffer buffer)
-> {
this.receiveManager.handle(MessageType.EDGE, 0, buffer);
});
- // Send edge message
+ // Send edge messages
this.receiveManager.onStarted(this.connectionId);
this.receiveManager.onFinished(this.connectionId);
this.receiveManager.waitReceivedAllMessages();
Map<Integer, PeekableIterator<KvEntry>> vertexPartitions =
- this.receiveManager.vertexPartitions();
+ this.receiveManager.vertexPartitions();
Map<Integer, PeekableIterator<KvEntry>> edgePartitions =
- this.receiveManager.edgePartitions();
+ this.receiveManager.edgePartitions();
Assert.assertEquals(1, vertexPartitions.size());
Assert.assertEquals(1, edgePartitions.size());
- VertexMessageRecvPartitionTest.checkPartitionIterator(
- vertexPartitions.get(0));
+
VertexMessageRecvPartitionTest.checkPartitionIterator(vertexPartitions.get(0));
EdgeMessageRecvPartitionTest.checkTenEdges(edgePartitions.get(0));
}
@@ -126,9 +116,8 @@ public class MessageRecvManagerTest extends UnitTestBase {
public void testComputeMessage() throws IOException {
// Superstep 0
this.receiveManager.beforeSuperstep(this.config, 0);
- ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer(
- (NetworkBuffer buffer) -> {
- this.receiveManager.handle(MessageType.MSG, 0, buffer);
+
ComputeMessageRecvPartitionTest.addTwentyCombineMessageBuffer((NetworkBuffer
buffer) -> {
+ this.receiveManager.handle(MessageType.MSG, 0, buffer);
});
this.receiveManager.onFinished(this.connectionId);
@@ -136,17 +125,15 @@ public class MessageRecvManagerTest extends UnitTestBase {
this.receiveManager.afterSuperstep(this.config, 0);
Map<Integer, PeekableIterator<KvEntry>> messagePartitions =
- this.receiveManager.messagePartitions();
+ this.receiveManager.messagePartitions();
Assert.assertEquals(1, messagePartitions.size());
- ComputeMessageRecvPartitionTest.checkTenCombineMessages(
- messagePartitions.get(0));
+
ComputeMessageRecvPartitionTest.checkTenCombineMessages(messagePartitions.get(0));
}
@Test
public void testOtherMessageType() {
Assert.assertThrows(ComputerException.class, () -> {
- ReceiverUtil.consumeBuffer(new byte[100],
- (NetworkBuffer buffer) -> {
+ ReceiverUtil.consumeBuffer(new byte[100], (NetworkBuffer buffer)
-> {
this.receiveManager.handle(MessageType.ACK, 0, buffer);
});
}, e -> {
@@ -161,8 +148,7 @@ public class MessageRecvManagerTest extends UnitTestBase {
Assert.assertThrows(ComputerException.class, () -> {
this.receiveManager.waitReceivedAllMessages();
}, e -> {
- Assert.assertContains("Expect 1 finish-messages",
- e.getMessage());
+ Assert.assertContains("finish-messages", e.getMessage());
});
}
}
diff --git
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
index b056c7d5..3e7d1c0b 100644
---
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
+++
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerServiceTest.java
@@ -47,56 +47,48 @@ public class WorkerServiceTest extends UnitTestBase {
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
- ComputerOptions.JOB_ID, "local_002",
- ComputerOptions.JOB_WORKERS_COUNT, "1",
- ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
- ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
- ComputerOptions.BSP_LOG_INTERVAL, "30000",
- ComputerOptions.BSP_MAX_SUPER_STEP, "2",
- ComputerOptions.WORKER_COMPUTATION_CLASS,
- MockComputation.class.getName(),
- ComputerOptions.ALGORITHM_RESULT_CLASS,
- DoubleValue.class.getName(),
- ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- DoubleValue.class.getName(),
- ComputerOptions.OUTPUT_CLASS,
- LimitedLogOutput.class.getName()
+ ComputerOptions.JOB_ID, "local_002",
+ ComputerOptions.JOB_WORKERS_COUNT, "1",
+ ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
+ ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
+ ComputerOptions.BSP_LOG_INTERVAL, "30000",
+ ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+ ComputerOptions.WORKER_COMPUTATION_CLASS,
+ MockComputation.class.getName(),
+ ComputerOptions.ALGORITHM_RESULT_CLASS,
+ DoubleValue.class.getName(),
+ ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+ DoubleValue.class.getName(),
+ ComputerOptions.OUTPUT_CLASS,
+ LimitedLogOutput.class.getName()
);
- WorkerService workerService = new MockWorkerService();
- try {
+ try (WorkerService workerService = new MockWorkerService()) {
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[0] = e;
} finally {
- workerService.close();
- try {
- workerService.close();
- } catch (Throwable e) {
- Assert.fail(e.getMessage());
- }
countDownLatch.countDown();
}
});
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
- RpcOptions.RPC_SERVER_HOST, "localhost",
- ComputerOptions.JOB_ID, "local_002",
- ComputerOptions.JOB_WORKERS_COUNT, "1",
- ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
- ComputerOptions.BSP_LOG_INTERVAL, "30000",
- ComputerOptions.BSP_MAX_SUPER_STEP, "2",
- ComputerOptions.MASTER_COMPUTATION_CLASS,
- MockMasterComputation.class.getName(),
- ComputerOptions.ALGORITHM_RESULT_CLASS,
- DoubleValue.class.getName(),
- ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- DoubleValue.class.getName()
+ RpcOptions.RPC_SERVER_HOST, "localhost",
+ ComputerOptions.JOB_ID, "local_002",
+ ComputerOptions.JOB_WORKERS_COUNT, "1",
+ ComputerOptions.BSP_REGISTER_TIMEOUT, "100000",
+ ComputerOptions.BSP_LOG_INTERVAL, "30000",
+ ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+ ComputerOptions.MASTER_COMPUTATION_CLASS,
+ MockMasterComputation.class.getName(),
+ ComputerOptions.ALGORITHM_RESULT_CLASS,
+ DoubleValue.class.getName(),
+ ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+ DoubleValue.class.getName()
);
- MasterService masterService = new MasterService();
- try {
+ try (MasterService masterService = new MasterService()) {
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
@@ -108,12 +100,6 @@ public class WorkerServiceTest extends UnitTestBase {
* if count down is executed first, and the server thread in
* master service will not be closed.
*/
- masterService.close();
- try {
- masterService.close();
- } catch (Throwable e) {
- Assert.fail(e.getMessage());
- }
countDownLatch.countDown();
}
});
@@ -121,8 +107,7 @@ public class WorkerServiceTest extends UnitTestBase {
countDownLatch.await();
pool.shutdownNow();
- Assert.assertFalse(Arrays.asList(exceptions).toString(),
- existError(exceptions));
+ Assert.assertFalse(Arrays.asList(exceptions).toString(),
existError(exceptions));
}
@Test
@@ -133,89 +118,84 @@ public class WorkerServiceTest extends UnitTestBase {
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
- ComputerOptions.JOB_ID, "local_003",
- ComputerOptions.JOB_WORKERS_COUNT, "2",
- ComputerOptions.JOB_PARTITIONS_COUNT, "2",
- ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
- ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
- ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
- ComputerOptions.BSP_LOG_INTERVAL, "10000",
- ComputerOptions.BSP_MAX_SUPER_STEP, "2",
- ComputerOptions.WORKER_COMPUTATION_CLASS,
- MockComputation2.class.getName(),
- ComputerOptions.ALGORITHM_RESULT_CLASS,
- DoubleValue.class.getName(),
- ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- DoubleValue.class.getName()
+ ComputerOptions.JOB_ID, "local_003",
+ ComputerOptions.JOB_WORKERS_COUNT, "2",
+ ComputerOptions.JOB_PARTITIONS_COUNT, "2",
+ ComputerOptions.TRANSPORT_SERVER_PORT, "8086",
+ ComputerOptions.WORKER_DATA_DIRS, "[job_8086]",
+ ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
+ ComputerOptions.BSP_LOG_INTERVAL, "10000",
+ ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+ ComputerOptions.WORKER_COMPUTATION_CLASS,
+ MockComputation2.class.getName(),
+ ComputerOptions.ALGORITHM_RESULT_CLASS,
+ DoubleValue.class.getName(),
+ ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+ DoubleValue.class.getName()
);
- WorkerService workerService = new MockWorkerService();
- try {
+
+ try (WorkerService workerService = new MockWorkerService()) {
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[0] = e;
} finally {
- workerService.close();
countDownLatch.countDown();
}
});
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
- ComputerOptions.JOB_ID, "local_003",
- ComputerOptions.JOB_WORKERS_COUNT, "2",
- ComputerOptions.JOB_PARTITIONS_COUNT, "2",
- ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
- ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
- ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
- ComputerOptions.BSP_LOG_INTERVAL, "10000",
- ComputerOptions.BSP_MAX_SUPER_STEP, "2",
- ComputerOptions.WORKER_COMPUTATION_CLASS,
- MockComputation2.class.getName(),
- ComputerOptions.ALGORITHM_RESULT_CLASS,
- DoubleValue.class.getName(),
- ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- DoubleValue.class.getName()
+ ComputerOptions.JOB_ID, "local_003",
+ ComputerOptions.JOB_WORKERS_COUNT, "2",
+ ComputerOptions.JOB_PARTITIONS_COUNT, "2",
+ ComputerOptions.TRANSPORT_SERVER_PORT, "8087",
+ ComputerOptions.WORKER_DATA_DIRS, "[job_8087]",
+ ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
+ ComputerOptions.BSP_LOG_INTERVAL, "10000",
+ ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+ ComputerOptions.WORKER_COMPUTATION_CLASS,
+ MockComputation2.class.getName(),
+ ComputerOptions.ALGORITHM_RESULT_CLASS,
+ DoubleValue.class.getName(),
+ ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+ DoubleValue.class.getName()
);
- WorkerService workerService = new MockWorkerService();
- try {
+ try (WorkerService workerService = new MockWorkerService()) {
workerService.init(config);
workerService.execute();
} catch (Throwable e) {
LOG.error("Failed to start worker", e);
exceptions[1] = e;
} finally {
- workerService.close();
countDownLatch.countDown();
}
});
pool.submit(() -> {
Config config = UnitTestBase.updateWithRequiredOptions(
- RpcOptions.RPC_SERVER_HOST, "localhost",
- ComputerOptions.JOB_ID, "local_003",
- ComputerOptions.JOB_WORKERS_COUNT, "2",
- ComputerOptions.JOB_PARTITIONS_COUNT, "2",
- ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
- ComputerOptions.BSP_LOG_INTERVAL, "10000",
- ComputerOptions.BSP_MAX_SUPER_STEP, "2",
- ComputerOptions.MASTER_COMPUTATION_CLASS,
- MockMasterComputation2.class.getName(),
- ComputerOptions.ALGORITHM_RESULT_CLASS,
- DoubleValue.class.getName(),
- ComputerOptions.ALGORITHM_MESSAGE_CLASS,
- DoubleValue.class.getName()
+ RpcOptions.RPC_SERVER_HOST, "localhost",
+ ComputerOptions.JOB_ID, "local_003",
+ ComputerOptions.JOB_WORKERS_COUNT, "2",
+ ComputerOptions.JOB_PARTITIONS_COUNT, "2",
+ ComputerOptions.BSP_REGISTER_TIMEOUT, "30000",
+ ComputerOptions.BSP_LOG_INTERVAL, "10000",
+ ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+ ComputerOptions.MASTER_COMPUTATION_CLASS,
+ MockMasterComputation2.class.getName(),
+ ComputerOptions.ALGORITHM_RESULT_CLASS,
+ DoubleValue.class.getName(),
+ ComputerOptions.ALGORITHM_MESSAGE_CLASS,
+ DoubleValue.class.getName()
);
- MasterService masterService = new MasterService();
- try {
+ try (MasterService masterService = new MasterService()) {
masterService.init(config);
masterService.execute();
} catch (Throwable e) {
LOG.error("Failed to start master", e);
exceptions[2] = e;
} finally {
- masterService.close();
countDownLatch.countDown();
}
});
@@ -223,41 +203,37 @@ public class WorkerServiceTest extends UnitTestBase {
countDownLatch.await();
pool.shutdownNow();
- Assert.assertFalse(Arrays.asList(exceptions).toString(),
- existError(exceptions));
+ Assert.assertFalse(Arrays.asList(exceptions).toString(),
existError(exceptions));
}
@Test
public void testFailToConnectEtcd() {
Config config = UnitTestBase.updateWithRequiredOptions(
- // Unavailable etcd endpoints
- ComputerOptions.BSP_ETCD_ENDPOINTS, "http://abc:8098",
- ComputerOptions.JOB_ID, "local_004",
- ComputerOptions.JOB_WORKERS_COUNT, "1",
- ComputerOptions.BSP_LOG_INTERVAL, "30000",
- ComputerOptions.BSP_MAX_SUPER_STEP, "2",
- ComputerOptions.WORKER_COMPUTATION_CLASS,
- MockComputation.class.getName()
+ // Unavailable etcd endpoints
+ ComputerOptions.BSP_ETCD_ENDPOINTS, "http://invalid-ip:8098",
+ ComputerOptions.JOB_ID, "local_004",
+ ComputerOptions.JOB_WORKERS_COUNT, "1",
+ ComputerOptions.BSP_LOG_INTERVAL, "30000",
+ ComputerOptions.BSP_MAX_SUPER_STEP, "2",
+ ComputerOptions.WORKER_COMPUTATION_CLASS,
+ MockComputation.class.getName()
);
- WorkerService workerService = new MockWorkerService();
- Assert.assertThrows(ComputerException.class, () -> {
- workerService.init(config);
- try {
+
+ try (WorkerService workerService = new MockWorkerService()) {
+ Assert.assertThrows(ComputerException.class, () -> {
+ workerService.init(config);
workerService.execute();
- } finally {
- workerService.close();
- }
- }, e -> {
- Assert.assertContains("Error while getting with " +
- "key='BSP_MASTER_INIT_DONE'",
- e.getMessage());
- Assert.assertContains("UNAVAILABLE: unresolved address",
- e.getCause().getMessage());
- });
+ }, e -> {
+ Assert.assertContains("Error while getting with
key='BSP_MASTER_INIT_DONE'",
+ e.getMessage());
+ Assert.assertContains("UNAVAILABLE: unresolved address",
+ e.getCause().getMessage());
+ });
+ }
}
@Test
- public void testDataTransportManagerFail() throws InterruptedException {
+ public void testDataTransportManagerFail() {
/*
* TODO: Complete this test case after data transport manager is
* completed.
diff --git
a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
index 7ac3e6cb..b95b602e 100644
---
a/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
+++
b/computer-test/src/main/java/org/apache/hugegraph/computer/k8s/KubernetesDriverTest.java
@@ -79,8 +79,7 @@ public class KubernetesDriverTest extends AbstractK8sTest {
File tempFile = File.createTempFile(UUID.randomUUID().toString(), "");
try {
String absolutePath = tempFile.getAbsolutePath();
- this.updateOptions(KubeDriverOptions.KUBE_CONFIG.name(),
- absolutePath);
+ this.updateOptions(KubeDriverOptions.KUBE_CONFIG.name(),
absolutePath);
NamedCluster cluster = new NamedClusterBuilder()
.withName("kubernetes")
.withNewCluster()
@@ -96,10 +95,11 @@ public class KubernetesDriverTest extends AbstractK8sTest {
.endContext()
.build();
io.fabric8.kubernetes.api.model.Config config = Config.builder()
- .withClusters(cluster)
- .addToContexts(context)
- .withCurrentContext(context.getName())
- .build();
+
.withClusters(cluster)
+
.addToContexts(context)
+
.withCurrentContext(
+
context.getName())
+ .build();
KubeConfigUtils.persistKubeConfigIntoFile(config, absolutePath);
System.setProperty(Config.KUBERNETES_KUBECONFIG_FILE,
absolutePath);
@@ -126,24 +126,18 @@ public class KubernetesDriverTest extends AbstractK8sTest
{
String namespace = Whitebox.getInternalState(this.driver, "namespace");
HugeConfig conf = Whitebox.getInternalState(this.driver, "conf");
Object operation = Whitebox.getInternalState(this.driver, "operation");
- MutableBoolean watchActive = Whitebox.getInternalState(
- this.driver, "watchActive");
+ MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
"watchActive");
Assert.assertTrue(watchActive.booleanValue());
Assert.assertEquals(namespace, "test");
Assert.assertNotNull(conf);
Assert.assertNotNull(operation);
final int workerInstances = 2;
- this.updateOptions(KubeSpecOptions.WORKER_INSTANCES.name(),
- workerInstances);
- Map<String, Object> defaultSpec = Whitebox.invoke(
- KubernetesDriver.class,
- "defaultSpec",
- this.driver);
- String workerInstancesKey = KubeUtil.covertSpecKey(
- KubeSpecOptions.WORKER_INSTANCES.name());
- Assert.assertEquals(defaultSpec.get(workerInstancesKey),
- workerInstances);
+ this.updateOptions(KubeSpecOptions.WORKER_INSTANCES.name(),
workerInstances);
+ Map<String, Object> defaultSpec =
Whitebox.invoke(KubernetesDriver.class,
+ "defaultSpec",
this.driver);
+ String workerInstancesKey =
KubeUtil.covertSpecKey(KubeSpecOptions.WORKER_INSTANCES.name());
+ Assert.assertEquals(defaultSpec.get(workerInstancesKey),
workerInstances);
}
@Test
@@ -167,21 +161,23 @@ public class KubernetesDriverTest extends AbstractK8sTest
{
}
@Test
- public void testUploadAlgorithmJarWithError() throws FileNotFoundException
{
+ public void testUploadAlgorithmJarWithError() {
Whitebox.setInternalState(this.driver, "bashPath",
"conf/images/upload_test-x.sh");
String url = "https://github.com/apache/hugegraph-doc/raw/" +
"binary-1.0/dist/computer/test.jar";
String path = "conf/images/test.jar";
downloadFileByUrl(url, path);
- InputStream inputStream = new FileInputStream(path);
- Assert.assertThrows(ComputerDriverException.class, () -> {
- this.driver.uploadAlgorithmJar("PageRank", inputStream);
- }, e -> {
- ComputerDriverException exception = (ComputerDriverException) e;
- Assert.assertContains("No such file",
- exception.rootCause().getMessage());
- });
+ try (InputStream inputStream = new FileInputStream(path)) {
+ Assert.assertThrows(ComputerDriverException.class, () -> {
+ this.driver.uploadAlgorithmJar("PageRank", inputStream);
+ }, e -> {
+ ComputerDriverException exception = (ComputerDriverException)
e;
+ Assert.assertContains("No such file",
exception.rootCause().getMessage());
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
@Test
@@ -189,12 +185,9 @@ public class KubernetesDriverTest extends AbstractK8sTest {
Map<String, String> params = new HashMap<>();
params.put(KubeSpecOptions.WORKER_INSTANCES.name(), "10");
String jobId = this.driver.submitJob("PageRank", params);
- HugeGraphComputerJob computerJob = this.operation
-
.withName(KubeUtil.crName(jobId))
- .get();
+ HugeGraphComputerJob computerJob =
this.operation.withName(KubeUtil.crName(jobId)).get();
Assert.assertNotNull(computerJob);
- Assert.assertEquals(computerJob.getSpec().getAlgorithmName(),
- "PageRank");
+ Assert.assertEquals(computerJob.getSpec().getAlgorithmName(),
"PageRank");
Assert.assertEquals(computerJob.getSpec().getJobId(), jobId);
}
@@ -205,16 +198,13 @@ public class KubernetesDriverTest extends AbstractK8sTest
{
String jobId = this.driver.submitJob("PageRank2", params);
String crName = KubeUtil.crName(jobId);
- HugeGraphComputerJob computerJob = this.operation.withName(crName)
- .get();
+ HugeGraphComputerJob computerJob =
this.operation.withName(crName).get();
Assert.assertNotNull(computerJob);
UnitTestBase.sleep(1000L);
this.driver.cancelJob(jobId, params);
- HugeGraphComputerJob canceledComputerJob = this.operation
- .withName(crName)
- .get();
+ HugeGraphComputerJob canceledComputerJob =
this.operation.withName(crName).get();
Assert.assertNull(canceledComputerJob);
Assert.assertNull(this.driver.jobState(jobId, params));
}
@@ -227,26 +217,21 @@ public class KubernetesDriverTest extends AbstractK8sTest
{
JobObserver jobObserver = Mockito.mock(JobObserver.class);
- CompletableFuture<Void> future = this.driver.waitJobAsync(jobId,
- params,
- jobObserver);
+ CompletableFuture<Void> future = this.driver.waitJobAsync(jobId,
params, jobObserver);
Mockito.verify(jobObserver, Mockito.timeout(5000L).atLeast(1))
.onJobStateChanged(Mockito.any(DefaultJobState.class));
future.getNow(null);
- MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
- "watchActive");
+ MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
"watchActive");
watchActive.setFalse();
this.driver.waitJobAsync(jobId, params, jobObserver);
this.driver.cancelJob(jobId, params);
UnitTestBase.sleep(1000L);
- CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId,
- params,
-
jobObserver);
+ CompletableFuture<Void> future2 = this.driver.waitJobAsync(jobId,
params, jobObserver);
Assert.assertNull(future2);
}
@@ -264,15 +249,12 @@ public class KubernetesDriverTest extends AbstractK8sTest
{
@Test
public void testOnClose() {
Map<String, Pair<CompletableFuture<Void>, JobObserver>> waits =
- Whitebox.getInternalState(this.driver, "waits");
- waits.put("test-123", Pair.of(new CompletableFuture<>(),
- Mockito.mock(JobObserver.class)));
+ Whitebox.getInternalState(this.driver, "waits");
+ waits.put("test-123", Pair.of(new CompletableFuture<>(),
Mockito.mock(JobObserver.class)));
- AbstractWatchManager<HugeGraphComputerJob> watch =
- Whitebox.getInternalState(
- this.driver, "watch");
- Watcher<HugeGraphComputerJob> watcher = Whitebox.getInternalState(
- watch, "watcher");
+ AbstractWatchManager<HugeGraphComputerJob> watch =
Whitebox.getInternalState(this.driver,
+
"watch");
+ Watcher<HugeGraphComputerJob> watcher =
Whitebox.getInternalState(watch, "watcher");
watcher.eventReceived(Watcher.Action.ADDED, null);
watcher.eventReceived(Watcher.Action.ERROR, new
HugeGraphComputerJob());
@@ -283,8 +265,7 @@ public class KubernetesDriverTest extends AbstractK8sTest {
WatcherException testClose = new WatcherException("test close");
watcher.onClose(testClose);
- MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
- "watchActive");
+ MutableBoolean watchActive = Whitebox.getInternalState(this.driver,
"watchActive");
Assert.assertFalse(watchActive.booleanValue());
}
@@ -297,14 +278,12 @@ public class KubernetesDriverTest extends AbstractK8sTest
{
Assert.assertThrows(IllegalArgumentException.class, () -> {
this.driver.submitJob("PageRank3", params);
}, e -> {
- Assert.assertContains(
- "The partitions count must be >= workers instances",
- e.getMessage()
+ Assert.assertContains("The partitions count must be >= workers
instances",
+ e.getMessage()
);
});
- Map<String, String> defaultConf = Whitebox.getInternalState(
- this.driver, "defaultConf");
+ Map<String, String> defaultConf =
Whitebox.getInternalState(this.driver, "defaultConf");
defaultConf = new HashMap<>(defaultConf);
defaultConf.remove(ComputerOptions.ALGORITHM_PARAMS_CLASS.name());
Whitebox.setInternalState(this.driver, "defaultConf", defaultConf);
@@ -312,9 +291,8 @@ public class KubernetesDriverTest extends AbstractK8sTest {
Assert.assertThrows(IllegalArgumentException.class, () -> {
this.driver.submitJob("PageRank3", params);
}, e -> {
- Assert.assertContains(
- "The [algorithm.params_class] options can't be null",
- e.getMessage()
+ Assert.assertContains("The [algorithm.params_class] options can't
be null",
+ e.getMessage()
);
});
}
diff --git a/pom.xml b/pom.xml
index 2784670b..a0d0634d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,8 @@
<name>hugegraph-computer</name>
<url>https://github.com/apache/hugegraph-computer</url>
<description>
- hugegraph-computer is a fast-speed, highly-scalable, fault-tolerance
graph processing system developed by apache.
+ hugegraph-computer is a fast-speed, highly scalable, fault-tolerance
graph processing
+ system developed by apache.
</description>
<inceptionYear>2020</inceptionYear>
@@ -88,7 +89,12 @@
</prerequisites>
<properties>
- <revision>1.0.0</revision>
+ <!-- TODO: update the version after toolchain v1.2 fixed -->
+ <revision>1.2.0</revision>
+ <hugegraph-common-version>1.2.0</hugegraph-common-version>
+ <hugegraph-client-version>1.2.0</hugegraph-client-version>
+ <hugegraph-rpc-version>1.2.0</hugegraph-rpc-version>
+ <hugegraph-loader-version>1.2.0</hugegraph-loader-version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<top.level.dir>${project.basedir}/..</top.level.dir>
<release.name>hugegraph-computer</release.name>
@@ -100,10 +106,6 @@
<hadoop-version>3.1.2</hadoop-version>
<netty-version>4.1.42.Final</netty-version>
<commons-lang3-version>3.12.0</commons-lang3-version>
- <hugegraph-common-version>1.0.0</hugegraph-common-version>
- <hugegraph-client-version>1.0.0</hugegraph-client-version>
- <hugegraph-rpc-version>1.0.0</hugegraph-rpc-version>
- <hugegraph-loader-version>1.0.0</hugegraph-loader-version>
<minio-version>8.5.6</minio-version>
</properties>
@@ -347,7 +349,7 @@
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
- <!-- Keep fix version to avoid computer-k8s-operator build
error -->
+ <!-- Keep a fixed version to avoid computer-k8s-operator build
error -->
<version>3.1</version>
<configuration>
<source>${compiler.source}</source>
@@ -526,5 +528,15 @@
</plugins>
</build>
</profile>
+ <!-- use mvn -P stage to enable the remote apache-stage repo -->
+ <profile>
+ <id>stage</id>
+ <repositories>
+ <repository>
+ <id>staged-releases</id>
+
<url>https://repository.apache.org/content/groups/staging/</url>
+ </repository>
+ </repositories>
+ </profile>
</profiles>
</project>