sijie closed pull request #1771:  [table service] Fix python client can't read 
keys written by java clients
URL: https://github.com/apache/bookkeeper/pull/1771
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pom.xml b/pom.xml
index af3cefa88a..55afe3fbf1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -168,6 +168,7 @@
     <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
     <coveralls-maven-plugin.version>4.3.0</coveralls-maven-plugin.version>
     <dockerfile-maven-plugin.version>1.3.7</dockerfile-maven-plugin.version>
+    <exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
     <license-maven-plugin.version>1.6</license-maven-plugin.version>
     <jacoco-maven-plugin.version>0.8.0</jacoco-maven-plugin.version>
     <maven-antrun-plugin.version>1.8</maven-antrun-plugin.version>
diff --git a/stream/clients/python/bookkeeper/admin/client.py 
b/stream/clients/python/bookkeeper/admin/client.py
index fdb82d2699..80de76d7b6 100644
--- a/stream/clients/python/bookkeeper/admin/client.py
+++ b/stream/clients/python/bookkeeper/admin/client.py
@@ -16,7 +16,6 @@
 
 import grpc
 import logging
-import pkg_resources
 
 from bookkeeper import types
 from bookkeeper.admin.namespace import Namespace
@@ -24,7 +23,6 @@
 from bookkeeper.common.service_uri import ServiceURI
 from bookkeeper.proto.storage_pb2_grpc import RootRangeServiceStub
 
-__version__ = pkg_resources.get_distribution('bookkeeper').version
 __logger__ = logging.getLogger("bookkeeper.admin.Client")
 
 
diff --git a/stream/clients/python/bookkeeper/admin/namespace.py 
b/stream/clients/python/bookkeeper/admin/namespace.py
index 5c3910b1ce..631673a5e9 100644
--- a/stream/clients/python/bookkeeper/admin/namespace.py
+++ b/stream/clients/python/bookkeeper/admin/namespace.py
@@ -14,9 +14,12 @@
 
 from __future__ import absolute_import
 
-from bookkeeper.common.constants import __DEFAULT_STREAM_CONF__
+from bookkeeper.common.constants import __DEFAULT_TABLE_CONF__
 from bookkeeper.common.constants import __ROOT_RANGE_METADATA__
 from bookkeeper.common.exceptions import from_root_range_rpc_response
+from bookkeeper.common.exceptions import InternalServerError
+from bookkeeper.common.exceptions import StreamExistsError
+from bookkeeper.common.exceptions import StreamNotFoundError
 from bookkeeper.common.method import wrap_method
 from bookkeeper.common.retry import Retry
 from bookkeeper.common.timeout import ExponentialTimeout
@@ -38,7 +41,7 @@ def __init__(self, root_range_service, namespace):
         self.__delete_with_retries__ =\
             wrap_method(self.__delete_stream__, self.__default_retry__)
 
-    def create(self, stream_name, stream_config=__DEFAULT_STREAM_CONF__):
+    def create(self, stream_name, stream_config=__DEFAULT_TABLE_CONF__):
         return self.__create_with_retries__(stream_name, stream_config)
 
     def __create_stream__(self, stream_name, stream_config):
@@ -51,8 +54,18 @@ def __create_stream__(self, stream_name, stream_config):
             request=create_stream_req,
             metadata=__ROOT_RANGE_METADATA__
         )
-        create_stream_resp = from_root_range_rpc_response(create_stream_resp)
-        return create_stream_resp.stream_props
+        try:
+            create_stream_resp = 
from_root_range_rpc_response(create_stream_resp)
+            return create_stream_resp.stream_props
+        except InternalServerError as ise:
+            # currently if a stream exists, it also throws
+            # internal server error
+            try:
+                self.get(stream_name=stream_name)
+                raise StreamExistsError("stream '%s' already exists at 
namespace '%s'"
+                                        % (stream_name, self.__namespace__))
+            except StreamNotFoundError:
+                raise ise
 
     def get(self, stream_name):
         return self.__get_with_retries__(stream_name)
diff --git a/stream/clients/python/bookkeeper/admin/namespaces.py 
b/stream/clients/python/bookkeeper/admin/namespaces.py
index 3df8cbd57d..d0e66352ff 100644
--- a/stream/clients/python/bookkeeper/admin/namespaces.py
+++ b/stream/clients/python/bookkeeper/admin/namespaces.py
@@ -17,6 +17,9 @@
 from bookkeeper.common.constants import __DEFAULT_NS_CONF__
 from bookkeeper.common.constants import __ROOT_RANGE_METADATA__
 from bookkeeper.common.exceptions import from_root_range_rpc_response
+from bookkeeper.common.exceptions import InternalServerError
+from bookkeeper.common.exceptions import NamespaceExistsError
+from bookkeeper.common.exceptions import NamespaceNotFoundError
 from bookkeeper.common.method import wrap_method
 from bookkeeper.common.retry import Retry
 from bookkeeper.common.timeout import ExponentialTimeout
@@ -48,8 +51,17 @@ def __create_ns__(self, namespace, namespace_config):
             request=create_ns_req,
             metadata=__ROOT_RANGE_METADATA__
         )
-        create_ns_resp = from_root_range_rpc_response(create_ns_resp)
-        return create_ns_resp.ns_props
+        try:
+            create_ns_resp = from_root_range_rpc_response(create_ns_resp)
+            return create_ns_resp.ns_props
+        except InternalServerError as ise:
+            # currently if a namespace exists, it also throws
+            # internal server error.
+            try:
+                self.get(namespace=namespace)
+                raise NamespaceExistsError("namespace '%s' already exists" % 
namespace)
+            except NamespaceNotFoundError:
+                raise ise
 
     def get(self, namespace):
         return self.__get_with_retries__(namespace)
diff --git a/stream/clients/python/bookkeeper/common/constants.py 
b/stream/clients/python/bookkeeper/common/constants.py
index b21f4bcceb..8c72791ff0 100644
--- a/stream/clients/python/bookkeeper/common/constants.py
+++ b/stream/clients/python/bookkeeper/common/constants.py
@@ -20,6 +20,28 @@
         ('bk-rt-sc-id-bin', util.to_bytes(__ROOT_RANGE_ID__, 8, "big"))
 ]
 __DEFAULT_STREAM_CONF__ = stream_pb2.StreamConfiguration(
+        key_type=stream_pb2.RangeKeyType.values()[0],
+        min_num_ranges=24,
+        initial_num_ranges=4,
+        split_policy=stream_pb2.SplitPolicy(
+                type=stream_pb2.SplitPolicyType.values()[0],
+                fixed_range_policy=stream_pb2.FixedRangeSplitPolicy(
+                        num_ranges=2
+                )
+        ),
+        rolling_policy=stream_pb2.SegmentRollingPolicy(
+                size_policy=stream_pb2.SizeBasedSegmentRollingPolicy(
+                        max_segment_size=128*1024*1024
+                )
+        ),
+        retention_policy=stream_pb2.RetentionPolicy(
+                time_policy=stream_pb2.TimeBasedRetentionPolicy(
+                        retention_minutes=-1
+                )
+        ),
+        storage_type=stream_pb2.StorageType.values()[0]
+)
+__DEFAULT_TABLE_CONF__ = stream_pb2.StreamConfiguration(
         key_type=stream_pb2.RangeKeyType.values()[0],
         min_num_ranges=24,
         initial_num_ranges=4,
diff --git a/stream/clients/python/bookkeeper/common/exceptions.py 
b/stream/clients/python/bookkeeper/common/exceptions.py
index 782a579d5b..531b79a88d 100644
--- a/stream/clients/python/bookkeeper/common/exceptions.py
+++ b/stream/clients/python/bookkeeper/common/exceptions.py
@@ -98,7 +98,8 @@ def __init__(self, message, errors=(), response=None):
         self._response = response
 
     def __str__(self):
-        return '{} {}'.format(self.code, self.message)
+        return 'grpc_status_code = {}, bk_status_code = {} : {}'\
+            .format(self.grpc_status_code, self.bk_status_code, self.message)
 
     @property
     def errors(self):
diff --git a/stream/clients/python/bookkeeper/kv/client.py 
b/stream/clients/python/bookkeeper/kv/client.py
index 3de1291990..e942fa634f 100644
--- a/stream/clients/python/bookkeeper/kv/client.py
+++ b/stream/clients/python/bookkeeper/kv/client.py
@@ -16,7 +16,6 @@
 
 import grpc
 import logging
-import pkg_resources
 
 from bookkeeper import types
 from bookkeeper.admin.namespace import Namespace
@@ -24,7 +23,6 @@
 from bookkeeper.kv.table import Table
 from bookkeeper.proto.storage_pb2_grpc import RootRangeServiceStub
 
-__version__ = pkg_resources.get_distribution('bookkeeper').version
 __logger__ = logging.getLogger("bookkeeper.kv.Client")
 
 
diff --git a/stream/clients/python/bookkeeper/kv/table.py 
b/stream/clients/python/bookkeeper/kv/table.py
index 605ab5d525..c5be5cad5e 100644
--- a/stream/clients/python/bookkeeper/kv/table.py
+++ b/stream/clients/python/bookkeeper/kv/table.py
@@ -40,6 +40,8 @@ def __init__(self, channel, stream_props):
             wrap_method(self.__do_get__, self.__default_retry__)
         self.__del_with_retries__ =\
             wrap_method(self.__do_del__, self.__default_retry__)
+        self.__incr_with_retries__ =\
+            wrap_method(self.__do_incr__, self.__default_retry__)
         __logger__.info("initialized table instance with properties : %s",
                         stream_props)
 
@@ -79,6 +81,27 @@ def __do_put__(self, key, value, routing_header, 
grpc_metadata):
         )
         from_table_rpc_response(put_resp)
 
+    def incr_str(self, key_str, amount):
+        key = key_str.encode('utf-8')
+        return self.incr(key, amount)
+
+    def incr(self, key, amount):
+        metadata = self.__make_routing_metadata__(key)
+        header = self.__make_routing_header__(key)
+        return self.__incr_with_retries__(key, amount, header, metadata)
+
+    def __do_incr__(self, key, amount, routing_header, grpc_metadata):
+        incr_req = kv_rpc_pb2.IncrementRequest(
+            key=key,
+            amount=amount,
+            header=routing_header
+        )
+        incr_resp = self.__table_service__.Increment(
+            request=incr_req,
+            metadata=grpc_metadata
+        )
+        from_table_rpc_response(incr_resp)
+
     def get_str(self, key_str):
         key = key_str.encode('utf-8')
         return self.get(key)
diff --git a/stream/clients/python/nox.py b/stream/clients/python/nox.py
index 5966201c4a..6bcb2d9f77 100644
--- a/stream/clients/python/nox.py
+++ b/stream/clients/python/nox.py
@@ -45,6 +45,33 @@ def default(session):
         *session.posargs
     )
 
[email protected]
+def integration(session):
+    """Default integration test session.
+    This is intended to be run **without** an interpreter set, so
+    that the current ``python`` (on the ``PATH``) or the version of
+    Python corresponding to the ``nox`` binary the ``PATH`` can
+    run the tests.
+    """
+    # Install all test dependencies, then install local packages in-place.
+    session.install('pytest', 'pytest-cov')
+    for local_dep in LOCAL_DEPS:
+        session.install('-e', local_dep)
+    session.install('-e', '.')
+
+    # Run py.test against the unit tests.
+    session.run(
+        'py.test',
+        '--quiet',
+        '--cov-append',
+        '--cov-report=',
+        '--cov=bookkeeper',
+        '--cov-config=.coveragerc',
+        os.path.join('tests', 'integration'),
+        *session.posargs
+    )
+
+
 
 @nox.session
 def lint(session):
diff --git a/stream/clients/python/noxfile.py b/stream/clients/python/noxfile.py
index 3027c9212c..612afc1742 100644
--- a/stream/clients/python/noxfile.py
+++ b/stream/clients/python/noxfile.py
@@ -52,6 +52,33 @@ def unit(session):
     default(session)
 
 
[email protected](python=[os.environ['PY_VERSION']])
+def integration(session):
+    """Default integration test session.
+    This is intended to be run **without** an interpreter set, so
+    that the current ``python`` (on the ``PATH``) or the version of
+    Python corresponding to the ``nox`` binary the ``PATH`` can
+    run the tests.
+    """
+    # Install all test dependencies, then install local packages in-place.
+    session.install('pytest', 'pytest-cov')
+    for local_dep in LOCAL_DEPS:
+        session.install('-e', local_dep)
+    session.install('-e', '.')
+
+    # Run py.test against the unit tests.
+    session.run(
+        'py.test',
+        '--quiet',
+        '--cov-append',
+        '--cov-report=',
+        '--cov=bookkeeper',
+        '--cov-config=.coveragerc',
+        os.path.join('tests', 'integration'),
+        *session.posargs
+    )
+
+
 @nox.session
 def lint(session):
     """Run linters.
diff --git a/stream/clients/python/scripts/docker_build.sh 
b/stream/clients/python/scripts/docker_build.sh
new file mode 100755
index 0000000000..f3bd918f80
--- /dev/null
+++ b/stream/clients/python/scripts/docker_build.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e -x -u
+
+SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+PY_VERSION=${PY_VERSION:-"3.7"}
+
+COMMANDS=`cat <<EOF
+pip install --upgrade setuptools wheel
+python3 setup.py sdist bdist_wheel
+EOF
+`
+
+docker run \
+    -v "${SCRIPT_DIR}/..":/opt/bookkeeper \
+    -w /opt/bookkeeper \
+    -e PY_VERSION=${PY_VERSION} \
+    python:${PY_VERSION} \
+    /bin/bash -c "${COMMANDS}"
diff --git a/stream/clients/python/scripts/docker_integration_tests.sh 
b/stream/clients/python/scripts/docker_integration_tests.sh
new file mode 100755
index 0000000000..2480924e8d
--- /dev/null
+++ b/stream/clients/python/scripts/docker_integration_tests.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e -x -u
+
+SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+NOXSESSION=${NOXSESSION:-"integration"}
+
+docker run \
+    -v "${SCRIPT_DIR}/..":/opt/bookkeeper_python_client \
+    -w /opt/bookkeeper_python_client \
+    -e NOXSESSION="${NOXSESSION}" \
+    --entrypoint=/bin/bash \
+    apachebookkeeper/bookkeeper-current \
+    /opt/bookkeeper_python_client/scripts/run_integration_tests.sh
diff --git a/stream/clients/python/scripts/run_integration_tests.sh 
b/stream/clients/python/scripts/run_integration_tests.sh
new file mode 100755
index 0000000000..637b98a77d
--- /dev/null
+++ b/stream/clients/python/scripts/run_integration_tests.sh
@@ -0,0 +1,41 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e -x -u
+
+BK_HOME=/opt/bookkeeper
+
+echo "starting bookkeeper standalone ..."
+${BK_HOME}/bin/standalone process up
+
+echo "installing nox ..."
+find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf
+pip install nox-automation
+echo "installed nox."
+
+TABLE="test-java-updates"
+echo "creating test table ..."
+${BK_HOME}/bin/bkctl tables create -r 1 ${TABLE}
+for x in {0..20}; do
+    echo "write kv pair '${x}'"
+    ${BK_HOME}/bin/bkctl table put ${TABLE} java-key-$x java-value-$x;
+done
+echo "ingested kv pairs for testing."
+
+echo "run integration tests"
+nox --session integration
+echo "done integration tests"
diff --git a/stream/clients/python/setup.py b/stream/clients/python/setup.py
index 74a8b0ca07..1dfbbe47c3 100644
--- a/stream/clients/python/setup.py
+++ b/stream/clients/python/setup.py
@@ -19,7 +19,7 @@
 
 name = 'apache-bookkeeper-client'
 description = 'Apache BookKeeper client library'
-version = '4.9.0-alpha-0'
+version = '4.9.0-alpha-2'
 # Should be one of:
 # 'Development Status :: 3 - Alpha'
 # 'Development Status :: 4 - Beta'
diff --git a/stream/clients/python/tests/integration/bookkeeper/__init__.py 
b/stream/clients/python/tests/integration/bookkeeper/__init__.py
new file mode 100644
index 0000000000..4d9a92490b
--- /dev/null
+++ b/stream/clients/python/tests/integration/bookkeeper/__init__.py
@@ -0,0 +1,11 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git 
a/stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py 
b/stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py
new file mode 100644
index 0000000000..fd403ef611
--- /dev/null
+++ 
b/stream/clients/python/tests/integration/bookkeeper/admin/test_admin_client.py
@@ -0,0 +1,81 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from bookkeeper import admin
+from bookkeeper.common.exceptions import NamespaceExistsError
+from bookkeeper.common.exceptions import NamespaceNotFoundError
+from bookkeeper.common.exceptions import StreamExistsError
+from bookkeeper.common.exceptions import StreamNotFoundError
+from bookkeeper.types import StorageClientSettings
+import logging
+import pytest
+import uuid
+
+__logger__ = logging.getLogger("bookkeeper.admin.test_admin_client")
+
+
+def test_create_delete_namespaces():
+    settings = StorageClientSettings(service_uri="bk://localhost:4181")
+    client = admin.Client(storage_client_settings=settings)
+    ns = "test_create_delete_namespaces_%s" % uuid.uuid4().hex
+    with pytest.raises(NamespaceNotFoundError):
+        __logger__.info("getting non-existent namespace '%s'", ns)
+        client.namespaces().get(ns)
+    __logger__.info("creating namespace '%s'", ns)
+    ns_props = client.namespaces().create(ns)
+    __logger__.info("created namespace '%s' : %s", ns, ns_props)
+    __logger__.info("getting namespace '%s'", ns)
+    read_ns_props = client.namespaces().get(ns)
+    __logger__.info("got namespace '%s' : %s", ns, read_ns_props)
+    assert ns_props == read_ns_props
+    with pytest.raises(NamespaceExistsError):
+        __logger__.info("creating existed namespace '%s'", ns)
+        client.namespaces().create(ns)
+    __logger__.info("deleting existed namespace '%s'", ns)
+    client.namespaces().delete(ns)
+    with pytest.raises(NamespaceNotFoundError):
+        client.namespaces().get(ns)
+    with pytest.raises(NamespaceNotFoundError):
+        client.namespaces().delete(ns)
+    __logger__.info("end of test_create_delete_namespace")
+
+
+def test_create_delete_tables():
+    settings = StorageClientSettings(service_uri="bk://localhost:4181")
+    client = admin.Client(storage_client_settings=settings)
+    ns_name = "test_create_delete_tables_%s" % uuid.uuid4().hex
+    ns_props = client.namespaces().create(ns_name)
+    __logger__.info("Created namespace '%s' : %s", ns_name, ns_props)
+    ns = client.namespace(ns_name)
+
+    table_name = "table_%s" % uuid.uuid4().hex
+    # test create, delete and get tables
+    with pytest.raises(StreamNotFoundError):
+        __logger__.info("getting non-existent table '%s'", table_name)
+        ns.get(table_name)
+    __logger__.info("creating table '%s'", table_name)
+    table_props = ns.create(table_name)
+    __logger__.info("created table '%s' : %s", table_name, table_props)
+    __logger__.info("getting table '%s'", table_name)
+    read_tbl_props = ns.get(table_name)
+    __logger__.info("got table '%s' : %s", table_name, read_tbl_props)
+    assert table_props == read_tbl_props
+    with pytest.raises(StreamExistsError):
+        __logger__.info("creating existed table '%s'", table_name)
+        ns.create(table_name)
+    __logger__.info("deleting existed table '%s'", table_name)
+    ns.delete(table_name)
+    with pytest.raises(StreamNotFoundError):
+        ns.get(table_name)
+    with pytest.raises(StreamNotFoundError):
+        ns.delete(table_name)
+    __logger__.info("end of test_create_delete_tables")
diff --git 
a/stream/clients/python/tests/integration/bookkeeper/kv/test_client.py 
b/stream/clients/python/tests/integration/bookkeeper/kv/test_client.py
new file mode 100644
index 0000000000..b43d99e9df
--- /dev/null
+++ b/stream/clients/python/tests/integration/bookkeeper/kv/test_client.py
@@ -0,0 +1,111 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from bookkeeper import admin, kv
+from bookkeeper.types import StorageClientSettings
+from bookkeeper.common.exceptions import BadRequest
+from bookkeeper.proto import storage_pb2
+import grpc
+import logging
+import pytest
+import uuid
+
+__logger__ = logging.getLogger("bookkeeper.kv.test_client")
+
+
+def test_kv_ops():
+    settings = StorageClientSettings(service_uri="bk://localhost:4181")
+    admin_client = admin.Client(storage_client_settings=settings)
+    ns = "test_kv_ops_%s" % uuid.uuid4().hex
+    ns_props = admin_client.namespaces().create(ns)
+    __logger__.info("Created namespace '%s' : %s", ns, ns_props)
+    tbl_name = "test_kv_ops_table"
+    tbl_props = admin_client.namespace(ns).create(tbl_name)
+    __logger__.info("Created table '%s' at namespace '%s' : %s", tbl_name, ns, 
tbl_props)
+
+    kv_client = kv.Client(storage_client_settings=settings, namespace=ns)
+    tbl = kv_client.table(tbl_name)
+
+    for x in range(0, 20):
+        read_val = tbl.get_str("key-%s" % x)
+        assert read_val is None
+
+    for x in range(0, 20):
+        tbl.put_str("key-%s" % x, "value-%s" % x)
+
+    for x in range(0, 20):
+        read_kv = tbl.get_str("key-%s" % x)
+        expected_key = "key-%s" % x
+        expected_value = "value-%s" % x
+        assert read_kv.key == str.encode(expected_key, 'utf-8')
+        assert read_kv.is_number is False
+        assert read_kv.value == str.encode(expected_value, 'utf-8')
+        assert read_kv.version == 0
+
+    for x in range(0, 20):
+        try:
+            tbl.incr_str("key-%s" % x, 20)
+            assert False
+        except BadRequest as e:
+            assert e.grpc_status_code == grpc.StatusCode.FAILED_PRECONDITION
+            assert e.bk_status_code == storage_pb2.BAD_REQUEST
+
+    for x in range(0, 20):
+        read_val = tbl.get_str("counter-%s" % x)
+        assert read_val is None
+
+    for x in range(0, 20):
+        tbl.incr_str("counter-%s" % x, (x + 1))
+
+    for x in range(0, 20):
+        read_kv = tbl.get_str("counter-%s" % x)
+        expected_key = "counter-%s" % x
+        expected_num = (x + 1)
+        assert read_kv.key == str.encode(expected_key, 'utf-8')
+        assert read_kv.is_number is True
+        assert read_kv.number_value == expected_num
+        assert read_kv.version == 0
+
+    for x in range(0, 20):
+        try:
+            tbl.put_str("counter-%s" % x, "value-%s" % x)
+            assert False
+        except BadRequest as e:
+            assert e.grpc_status_code == grpc.StatusCode.FAILED_PRECONDITION
+            assert e.bk_status_code == storage_pb2.BAD_REQUEST
+
+    for x in range(0, 20):
+        tbl.delete_str("key-%s" % x)
+        read_val = tbl.get_str("key-%s" % x)
+        assert read_val is None
+
+    for x in range(0, 20):
+        tbl.delete_str("counter-%s" % x)
+        read_val = tbl.get_str("counter-%s" % x)
+        assert read_val is None
+
+
+def test_get_kv_from_table_updated_by_java_client():
+    settings = StorageClientSettings(service_uri="bk://localhost:4181")
+    ns = "default"
+    tbl_name = "test-java-updates"
+    kv_client = kv.Client(storage_client_settings=settings, namespace=ns)
+    tbl = kv_client.table(tbl_name)
+
+    for x in range(0, 20):
+        expected_key = "java-key-%s" % x
+        read_kv = tbl.get_str(expected_key)
+        expected_value = "java-value-%s" % x
+        assert read_kv.key == str.encode(expected_key, 'utf-8')
+        assert read_kv.is_number is False
+        assert read_kv.value == str.encode(expected_value, 'utf-8')
+        assert read_kv.version == 0
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
index 4bdc801568..b3c67f20fd 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/StorageContainerImpl.java
@@ -32,6 +32,7 @@
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
 import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerService;
 import 
org.apache.bookkeeper.stream.storage.api.sc.StorageContainerServiceFactory;
+import 
org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
 
 /**
  * The default implementation of {@link StorageContainer}.
@@ -82,6 +83,8 @@ public long getId() {
                 channel = InProcessChannelBuilder.forName(containerName)
                     .usePlaintext()
                     .directExecutor()
+                    // attach routing header interceptor
+                    .intercept(new RoutingHeaderProxyInterceptor())
                     .build();
                 return FutureUtils.value(StorageContainerImpl.this);
             } catch (IOException e) {
diff --git 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
index e3b2d3991d..5fe1612f65 100644
--- 
a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
+++ 
b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/service/RangeStoreServiceImpl.java
@@ -41,6 +41,7 @@
 import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
+import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnRequest;
 import org.apache.bookkeeper.stream.proto.kv.rpc.TxnResponse;
@@ -58,6 +59,7 @@
 import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
 import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
+import org.apache.bookkeeper.stream.proto.storage.StatusCode;
 import org.apache.bookkeeper.stream.protocol.RangeId;
 import 
org.apache.bookkeeper.stream.protocol.util.StorageContainerPlacementPolicy;
 import org.apache.bookkeeper.stream.storage.api.kv.TableStore;
@@ -242,6 +244,15 @@ public void close() {
     public CompletableFuture<RangeResponse> range(RangeRequest request) {
         RoutingHeader header = request.getHeader();
 
+        if (header.getRangeId() <= 0L) {
+            return CompletableFuture.completedFuture(RangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(StatusCode.BAD_REQUEST)
+                    .setRoutingHeader(request.getHeader())
+                    .build())
+                .build());
+        }
+
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
         if (null != store) {
@@ -256,6 +267,15 @@ public void close() {
     public CompletableFuture<PutResponse> put(PutRequest request) {
         RoutingHeader header = request.getHeader();
 
+        if (header.getRangeId() <= 0L) {
+            return CompletableFuture.completedFuture(PutResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(StatusCode.BAD_REQUEST)
+                    .setRoutingHeader(request.getHeader())
+                    .build())
+                .build());
+        }
+
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
         if (null != store) {
@@ -270,6 +290,15 @@ public void close() {
     public CompletableFuture<DeleteRangeResponse> delete(DeleteRangeRequest 
request) {
         RoutingHeader header = request.getHeader();
 
+        if (header.getRangeId() <= 0L) {
+            return 
CompletableFuture.completedFuture(DeleteRangeResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(StatusCode.BAD_REQUEST)
+                    .setRoutingHeader(request.getHeader())
+                    .build())
+                .build());
+        }
+
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
         if (null != store) {
@@ -284,6 +313,15 @@ public void close() {
     public CompletableFuture<TxnResponse> txn(TxnRequest request) {
         RoutingHeader header = request.getHeader();
 
+        if (header.getRangeId() <= 0L) {
+            return CompletableFuture.completedFuture(TxnResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(StatusCode.BAD_REQUEST)
+                    .setRoutingHeader(request.getHeader())
+                    .build())
+                .build());
+        }
+
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
         if (null != store) {
@@ -298,6 +336,15 @@ public void close() {
     public CompletableFuture<IncrementResponse> incr(IncrementRequest request) 
{
         RoutingHeader header = request.getHeader();
 
+        if (header.getRangeId() <= 0L) {
+            return 
CompletableFuture.completedFuture(IncrementResponse.newBuilder()
+                .setHeader(ResponseHeader.newBuilder()
+                    .setCode(StatusCode.BAD_REQUEST)
+                    .setRoutingHeader(request.getHeader())
+                    .build())
+                .build());
+        }
+
         RangeId rid = RangeId.of(header.getStreamId(), header.getRangeId());
         TableStore store = tableStoreCache.getTableStore(rid);
         if (null != store) {
diff --git a/tests/docker-images/current-version-image/Dockerfile 
b/tests/docker-images/current-version-image/Dockerfile
index c174b6a318..7d998a46de 100644
--- a/tests/docker-images/current-version-image/Dockerfile
+++ b/tests/docker-images/current-version-image/Dockerfile
@@ -35,7 +35,8 @@ ENV JAVA_HOME=/usr/lib/jvm/jre-1.8.0
 # prepare utils
 RUN set -x \
     && adduser "${BK_USER}" \
-    && yum install -y java-1.8.0-openjdk-headless wget bash python sudo netcat 
\
+    && yum install -y epel-release \
+    && yum install -y java-1.8.0-openjdk-headless wget bash python-pip 
python-devel sudo netcat gcc gcc-c++ \
     && mkdir -pv /opt \
     && cd /opt \
     && yum clean all
@@ -47,8 +48,13 @@ RUN mv /opt/${PKG_NAME} /opt/bookkeeper
 WORKDIR /opt/bookkeeper
 
 COPY target/scripts /opt/bookkeeper/scripts
+COPY scripts/install-python-client.sh /opt/bookkeeper/scripts
 RUN chmod +x -R /opt/bookkeeper/scripts/
 
+# copy the python client
+ADD target/bookkeeper-client/ /opt/bookkeeper/bookkeeper-client
+RUN /opt/bookkeeper/scripts/install-python-client.sh
+
 ENTRYPOINT [ "/bin/bash", "/opt/bookkeeper/scripts/entrypoint.sh" ]
 CMD ["bookie"]
 
diff --git a/tests/docker-images/current-version-image/pom.xml 
b/tests/docker-images/current-version-image/pom.xml
index 0d2c95da5c..edffc64814 100644
--- a/tests/docker-images/current-version-image/pom.xml
+++ b/tests/docker-images/current-version-image/pom.xml
@@ -46,6 +46,51 @@
       </activation>
       <build>
         <plugins>
+          <!-- build cpp client, copy the wheel file and then build docker 
image -->
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>exec-maven-plugin</artifactId>
+            <version>${exec-maven-plugin.version}</version>
+            <executions>
+              <execution>
+                <id>build-python-client</id>
+                <phase>generate-resources</phase>
+                <goals>
+                  <goal>exec</goal>
+                </goals>
+                <configuration>
+                  
<workingDirectory>${project.basedir}/target</workingDirectory>
+                  
<executable>${project.basedir}/../../../stream/clients/python/scripts/docker_build.sh</executable>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+          <!-- this task is used for copy docker scripts & python wheel file 
to build docker image -->
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <version>${maven-antrun-plugin.version}</version>
+            <executions>
+              <execution>
+                <phase>generate-resources</phase>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+                <configuration>
+                  <tasks>
+                    <echo>copy python wheel file</echo>
+                    <mkdir dir="${project.basedir}/target/bookkeeper-client"/>
+                    <copydir 
src="${project.basedir}/../../../stream/clients/python/dist"
+                             
dest="${project.basedir}/target/bookkeeper-client"/>
+                    <echo>copying docker scripts</echo>
+                    <mkdir dir="${project.basedir}/target/scripts" />
+                    <copydir src="${project.basedir}/../../../docker/scripts"
+                             dest="${project.basedir}/target/scripts" />
+                  </tasks>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
           <plugin>
             <groupId>com.spotify</groupId>
             <artifactId>dockerfile-maven-plugin</artifactId>
@@ -97,27 +142,6 @@
               </execution>
             </executions>
           </plugin>
-          <!-- this task is used for copy docker scripts to build docker image 
-->
-          <plugin>
-            <groupId>org.apache.maven.plugins</groupId>
-            <artifactId>maven-antrun-plugin</artifactId>
-            <version>${maven-antrun-plugin.version}</version>
-            <executions>
-              <execution>
-                <phase>generate-resources</phase>
-                <goals>
-                  <goal>run</goal>
-                </goals>
-                <configuration>
-                  <tasks>
-                    <echo>copying docker scripts</echo>
-                    <mkdir dir="${basedir}/target/scripts" />
-                    <copydir src="${basedir}/../../../docker/scripts" 
dest="${basedir}/target/scripts" />
-                  </tasks>
-                </configuration>
-              </execution>
-            </executions>
-          </plugin>
         </plugins>
       </build>
     </profile>
diff --git 
a/tests/docker-images/current-version-image/scripts/install-python-client.sh 
b/tests/docker-images/current-version-image/scripts/install-python-client.sh
new file mode 100644
index 0000000000..2719b335ac
--- /dev/null
+++ b/tests/docker-images/current-version-image/scripts/install-python-client.sh
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+set -x
+
+WHEEL_FILE=`ls /opt/bookkeeper/bookkeeper-client/*.whl`
+pip install ${WHEEL_FILE}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to