This is an automated email from the ASF dual-hosted git repository.
lupeng pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 5fe2d4d1a4e HBASE-29457 thrift1/DemoClient.py is not running (#7529)
(#7711)
5fe2d4d1a4e is described below
commit 5fe2d4d1a4ebd94180045ebc77520ecec37c788b
Author: Liu Xiao <[email protected]>
AuthorDate: Wed Feb 25 23:04:50 2026 +0800
HBASE-29457 thrift1/DemoClient.py is not running (#7529) (#7711)
Signed-off-by: Peng Lu <[email protected]>
---
.../org/apache/hadoop/hbase/thrift/DemoClient.java | 26 +-
.../src/main/python/thrift1/DemoClient.py | 429 +++++++++++----------
2 files changed, 243 insertions(+), 212 deletions(-)
diff --git
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/thrift/DemoClient.java
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/thrift/DemoClient.java
index ca686fb1749..d7257aeae25 100644
---
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/thrift/DemoClient.java
+++
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/thrift/DemoClient.java
@@ -139,9 +139,8 @@ public class DemoClient {
}
// Create the demo table with two column families, entry: and unused:
- ArrayList<ColumnDescriptor> columns = new ArrayList<>(2);
- ColumnDescriptor col;
- col = new ColumnDescriptor();
+ List<ColumnDescriptor> columns = new ArrayList<>(2);
+ ColumnDescriptor col = new ColumnDescriptor();
col.name = ByteBuffer.wrap(bytes("entry:"));
col.timeToLive = Integer.MAX_VALUE;
col.maxVersions = 10;
@@ -152,7 +151,6 @@ public class DemoClient {
columns.add(col);
System.out.println("creating table: " +
ClientUtils.utf8(demoTable.array()));
-
try {
client.createTable(demoTable, columns);
client.createTable(disabledTable, columns);
@@ -162,7 +160,6 @@ public class DemoClient {
System.out.println("column families in " +
ClientUtils.utf8(demoTable.array()) + ": ");
Map<ByteBuffer, ColumnDescriptor> columnMap =
client.getColumnDescriptors(demoTable);
-
for (ColumnDescriptor col2 : columnMap.values()) {
System.out.println(
" column: " + ClientUtils.utf8(col2.name.array()) + ", maxVer: " +
col2.maxVersions);
@@ -210,8 +207,8 @@ public class DemoClient {
client.mutateRow(demoTable, ByteBuffer.wrap(invalid), mutations,
dummyAttributes);
// Run a scanner on the rows we just created
- ArrayList<ByteBuffer> columnNames = new ArrayList<>();
- columnNames.add(ByteBuffer.wrap(bytes("entry:")));
+ List<ByteBuffer> columnNames = new ArrayList<>();
+ columnNames.add(ByteBuffer.wrap(bytes("entry")));
System.out.println("Starting scanner...");
int scanner =
@@ -226,6 +223,7 @@ public class DemoClient {
printRow(entry);
}
+ System.out.println("Scanner finished...");
// Run some operations on a bunch of rows
for (int i = 100; i >= 0; --i) {
@@ -257,9 +255,8 @@ public class DemoClient {
client.mutateRow(demoTable, ByteBuffer.wrap(row), mutations,
dummyAttributes);
printRow(client.getRow(demoTable, ByteBuffer.wrap(row),
dummyAttributes));
- Mutation m;
mutations = new ArrayList<>(2);
- m = new Mutation();
+ Mutation m = new Mutation();
m.column = ByteBuffer.wrap(bytes("entry:foo"));
m.isDelete = true;
mutations.add(m);
@@ -314,7 +311,7 @@ public class DemoClient {
System.exit(-1);
}
- System.out.println("");
+ System.out.println();
}
// scan all rows/columnNames
@@ -322,9 +319,10 @@ public class DemoClient {
for (ColumnDescriptor col2 :
client.getColumnDescriptors(demoTable).values()) {
System.out.println("column with name: " + ClientUtils.utf8(col2.name));
- System.out.println(col2.toString());
-
- columnNames.add(col2.name);
+ System.out.println(col2);
+ // remove the trailing ':' from the family name
+ col2.name.limit(col2.name.limit() - 1);
+ columnNames.add(col2.name.slice());
}
System.out.println("Starting scanner...");
@@ -335,7 +333,7 @@ public class DemoClient {
List<TRowResult> entry = client.scannerGet(scanner);
if (entry.isEmpty()) {
- System.out.println("Scanner finished");
+ System.out.println("Scanner finished...");
break;
}
diff --git a/hbase-examples/src/main/python/thrift1/DemoClient.py
b/hbase-examples/src/main/python/thrift1/DemoClient.py
index a030a95a434..82441b35d2b 100644
--- a/hbase-examples/src/main/python/thrift1/DemoClient.py
+++ b/hbase-examples/src/main/python/thrift1/DemoClient.py
@@ -1,212 +1,245 @@
#!/usr/bin/env python
-'''
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-'''
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
import sys
import time
-import os
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from gen_py.hbase import ttypes
from gen_py.hbase.Hbase import Client, ColumnDescriptor, Mutation
+
def printVersions(row, versions):
- print("row: " + row + ", values: ", end=' ')
- for cell in versions:
- print(cell.value + "; ", end=' ')
- print()
+ print("row: " + row.decode() + ", values: ", end="")
+ for cell in versions:
+ print(cell.value.decode() + "; ", end="")
+ print()
-def printRow(entry):
- print("row: " + entry.row + ", cols:", end=' ')
- for k in sorted(entry.columns):
- print(k + " => " + entry.columns[k].value, end=' ')
- print()
+def printRow(row_result):
+ for res in row_result:
+ sorted_cols = dict(sorted(res.columns.items()))
+ row_str = ""
+ for k, v in sorted_cols.items():
+ # ignore error to prevent UnicodeDecodeError: 'utf-8' codec can't
decode byte 0xfc in
+ # position 4: invalid start byte
+ row_str += (
+ f"{k.decode('utf-8')} => {v.value.decode('utf-8',
errors='ignore')}; "
+ )
+ print(f"row: {res.row.decode('utf-8', errors='ignore')}, cols:
{row_str}")
-def demo_client(host, port, is_framed_transport):
- # Make socket
- socket = TSocket.TSocket(host, port)
-
- # Make transport
- if is_framed_transport:
- transport = TTransport.TFramedTransport(socket)
- else:
- transport = TTransport.TBufferedTransport(socket)
-
- # Wrap in a protocol
- protocol = TBinaryProtocol.TBinaryProtocol(transport)
-
- # Create a client to use the protocol encoder
- client = Client(protocol)
-
- # Connect!
- transport.open()
-
- # Check Thrift Server Type
- serverType = client.getThriftServerType()
- if serverType != ttypes.TThriftServerType.ONE:
- raise RuntimeError(f"Mismatch between client and server, server type is
{serverType}")
-
- t = "demo_table"
-
- #
- # Scan all tables, look for the demo table and delete it.
- #
- print("scanning tables...")
- for table in client.getTableNames():
- print(f" found: {table}")
- if table == t:
- if client.isTableEnabled(table):
- print(f" disabling table: {t}")
- client.disableTable(table)
- print(f" deleting table: {t}")
- client.deleteTable(table)
-
- columns = []
- col = ColumnDescriptor()
- col.name = 'entry:'
- col.maxVersions = 10
- columns.append(col)
- col = ColumnDescriptor()
- col.name = 'unused:'
- columns.append(col)
-
- try:
- print(f"creating table: {t}")
- client.createTable(t, columns)
- except ttypes.AlreadyExists as ae:
- print("WARN: " + ae.message)
-
- cols = client.getColumnDescriptors(t)
- print(f"column families in {t}")
- for col_name in cols.keys():
- col = cols[col_name]
- print(f" column: {col.name}, maxVer: {col.maxVersions}")
-
- dummy_attributes = {}
- #
- # Test UTF-8 handling
- #
- invalid = "foo-\xfc\xa1\xa1\xa1\xa1\xa1"
- valid = "foo-\xE7\x94\x9F\xE3\x83\x93\xE3\x83\xBC\xE3\x83\xAB";
-
- # non-utf8 is fine for data
- mutations = [Mutation(column="entry:foo",value=invalid)]
- print(str(mutations))
- client.mutateRow(t, "foo", mutations, dummy_attributes)
-
- # try empty strings
- mutations = [Mutation(column="entry:", value="")]
- client.mutateRow(t, "", mutations, dummy_attributes)
-
- # this row name is valid utf8
- mutations = [Mutation(column="entry:foo", value=valid)]
- client.mutateRow(t, valid, mutations, dummy_attributes)
-
- # non-utf8 is not allowed in row names
- try:
- mutations = [Mutation(column="entry:foo", value=invalid)]
- client.mutateRow(t, invalid, mutations, dummy_attributes)
- except ttypes.IOError as e:
- print(f'expected exception: {e.message}')
-
- # Run a scanner on the rows we just created
- print("Starting scanner...")
- scanner = client.scannerOpen(t, "", ["entry:"], dummy_attributes)
-
- r = client.scannerGet(scanner)
- while r:
- printRow(r[0])
- r = client.scannerGet(scanner)
- print("Scanner finished")
-
- #
- # Run some operations on a bunch of rows.
- #
- for e in range(100, 0, -1):
- # format row keys as "00000" to "00100"
- row = f"{row:05}"
-
- mutations = [Mutation(column="unused:", value="DELETE_ME")]
- client.mutateRow(t, row, mutations, dummy_attributes)
- printRow(client.getRow(t, row, dummy_attributes)[0])
- client.deleteAllRow(t, row, dummy_attributes)
-
- mutations = [Mutation(column="entry:num", value="0"),
- Mutation(column="entry:foo", value="FOO")]
- client.mutateRow(t, row, mutations, dummy_attributes)
- printRow(client.getRow(t, row, dummy_attributes)[0]);
-
- mutations = [Mutation(column="entry:foo",isDelete=True),
- Mutation(column="entry:num",value="-1")]
- client.mutateRow(t, row, mutations, dummy_attributes)
- printRow(client.getRow(t, row, dummy_attributes)[0])
-
- mutations = [Mutation(column="entry:num", value=str(e)),
- Mutation(column="entry:sqr", value=str(e*e))]
- client.mutateRow(t, row, mutations, dummy_attributes)
- printRow(client.getRow(t, row, dummy_attributes)[0])
-
- time.sleep(0.05)
-
- mutations = [Mutation(column="entry:num",value="-999"),
- Mutation(column="entry:sqr",isDelete=True)]
- client.mutateRowTs(t, row, mutations, 1, dummy_attributes) # shouldn't
override latest
- printRow(client.getRow(t, row, dummy_attributes)[0])
-
- versions = client.getVer(t, row, "entry:num", 10, dummy_attributes)
- printVersions(row, versions)
- if len(versions) != 3:
- print("FATAL: wrong # of versions")
- sys.exit(-1)
-
- r = client.get(t, row, "entry:foo", dummy_attributes)
- # just to be explicit, we get lists back, if it's empty there was no
matching row.
- if len(r) > 0:
- raise RuntimeError("shouldn't get here!")
-
- columnNames = []
- for (col, desc) in client.getColumnDescriptors(t).items():
- print("column with name: "+desc.name)
- print(desc)
- columnNames.append(desc.name+":")
-
- print("Starting scanner...")
- scanner = client.scannerOpenWithStop(t, "00020", "00040", columnNames,
dummy_attributes)
-
- r = client.scannerGet(scanner)
- while r:
- printRow(r[0])
- r = client.scannerGet(scanner)
-
- client.scannerClose(scanner)
- print("Scanner finished")
-
- transport.close()
-
-
-if __name__ == '__main__':
- if len(sys.argv) < 3:
- print(f'usage: {__file__} <host> <port>')
- sys.exit(1)
-
- host = sys.argv[1]
- port = sys.argv[2]
- is_framed_transport = False
- demo_client(host, port, is_framed_transport)
+def demo_client(host, port, is_framed_transport):
+ # Make socket
+ socket = TSocket.TSocket(host, port)
+
+ # Make transport
+ if is_framed_transport:
+ transport = TTransport.TFramedTransport(socket)
+ else:
+ transport = TTransport.TBufferedTransport(socket)
+
+ # Wrap in a protocol
+ protocol = TBinaryProtocol.TBinaryProtocol(transport)
+
+ # Create a client to use the protocol encoder
+ client = Client(protocol)
+
+ # Connect!
+ transport.open()
+
+ # Check Thrift Server Type
+ serverType = client.getThriftServerType()
+ if serverType != ttypes.TThriftServerType.ONE:
+ raise RuntimeError(
+ f"Mismatch between client and server, server type is {serverType}"
+ )
+
+ demo_table = "demo_table".encode()
+ disable_table = "disabled_table".encode()
+
+ # Scan all tables, look for the demo table and delete it.
+ print("scanning tables...")
+ for table in client.getTableNames():
+ print(f" found: {table.decode()}")
+ if table in {demo_table, disable_table}:
+ if client.isTableEnabled(table):
+ print(f" disabling table: {table.decode()}")
+ client.disableTable(table)
+ print(f" deleting table: {table.decode()}")
+ client.deleteTable(table)
+
+ # Create the demo table with two column families, entry: and unused:
+ columns = []
+ col = ColumnDescriptor()
+ col.name = "entry:".encode()
+ col.maxVersions = 10
+ columns.append(col)
+ col = ColumnDescriptor()
+ col.name = "unused:".encode()
+ columns.append(col)
+
+ print(f"creating table: {demo_table.decode()}")
+ try:
+ client.createTable(demo_table, columns)
+ client.createTable(disable_table, columns)
+ except ttypes.AlreadyExists as ae:
+ print("WARN: " + ae.message)
+
+ cols = client.getColumnDescriptors(demo_table)
+ print(f"column families in {demo_table.decode()}: ")
+ for col_name in cols.keys():
+ col = cols[col_name]
+ print(f" column: {col.name.decode()}, maxVer: {col.maxVersions}")
+
+ if client.isTableEnabled(disable_table):
+ print(f"disabling table: {disable_table.decode()}")
+ client.disableTable(disable_table)
+
+ dummy_attributes = {}
+
+ # Test UTF-8 handling
+ invalid = b"foo-\xfc\xa1\xa1\xa1\xa1"
+ valid = b"foo-\xe7\x94\x9f\xe3\x83\x93\xe3\x83\xbc\xe3\x83\xab"
+
+ # non-utf8 is fine for data
+ mutations = [Mutation(column="entry:foo".encode(), value=invalid,
writeToWAL=False)]
+ client.mutateRow(demo_table, "foo".encode(), mutations, dummy_attributes)
+
+ # this row name is valid utf8
+ mutations = [Mutation(column="entry:foo".encode(), value=valid,
writeToWAL=False)]
+ client.mutateRow(demo_table, valid, mutations, dummy_attributes)
+
+ # non-utf8 is now allowed in row names because HBase stores values as
binary
+ mutations = [Mutation(column="entry:foo".encode(), value=invalid,
writeToWAL=False)]
+ client.mutateRow(demo_table, invalid, mutations, dummy_attributes)
+
+ # Run a scanner on the rows we just created
+ print("Starting scanner...")
+ scanner = client.scannerOpen(
+ demo_table, "".encode(), ["entry".encode()], dummy_attributes
+ )
+
+ while True:
+ r = client.scannerGet(scanner)
+ if not r:
+ break
+ printRow(r)
+ print("Scanner finished...")
+
+ # Run some operations on a bunch of rows.
+ for e in range(100, -1, -1):
+ # format row keys as "00000" to "00100"
+ row = f"{e:05}".encode()
+
+ mutations = [
+ Mutation(
+ column="unused:".encode(), value="DELETE_ME".encode(),
writeToWAL=False
+ )
+ ]
+ client.mutateRow(demo_table, row, mutations, dummy_attributes)
+ printRow(client.getRow(demo_table, row, dummy_attributes))
+ client.deleteAllRow(demo_table, row, dummy_attributes)
+
+ # sleep to force later timestamp
+ time.sleep(0.05)
+
+ mutations = [
+ Mutation(column="entry:num".encode(), value="0".encode(),
writeToWAL=False),
+ Mutation(
+ column="entry:foo".encode(), value="FOO".encode(),
writeToWAL=False
+ ),
+ ]
+ client.mutateRow(demo_table, row, mutations, dummy_attributes)
+ printRow(client.getRow(demo_table, row, dummy_attributes))
+
+ mutations = [
+ Mutation(column="entry:foo".encode(), isDelete=True),
+ Mutation(column="entry:num".encode(), value="-1".encode()),
+ ]
+ client.mutateRow(demo_table, row, mutations, dummy_attributes)
+ printRow(client.getRow(demo_table, row, dummy_attributes))
+
+ mutations = [
+ Mutation(
+ column="entry:num".encode(), value=str(e).encode(),
writeToWAL=False
+ ),
+ Mutation(
+ column="entry:sqr".encode(), value=str(e * e).encode(),
writeToWAL=False
+ ),
+ ]
+ client.mutateRow(demo_table, row, mutations, dummy_attributes)
+ printRow(client.getRow(demo_table, row, dummy_attributes))
+
+ # sleep to force later timestamp
+ time.sleep(0.05)
+
+ mutations = [
+ Mutation(column="entry:num".encode(), value="-999".encode()),
+ Mutation(column="entry:sqr".encode(), isDelete=True),
+ ]
+ # shouldn't override latest
+ client.mutateRowTs(demo_table, row, mutations, 1, dummy_attributes)
+ printRow(client.getRow(demo_table, row, dummy_attributes))
+
+ versions = client.getVer(
+ demo_table, row, "entry:num".encode(), 10, dummy_attributes
+ )
+ printVersions(row, versions)
+ if len(versions) != 3:
+ print("FATAL: wrong # of versions")
+ sys.exit(-1)
+
+ r = client.get(demo_table, row, "entry:foo".encode(), dummy_attributes)
+ # just to be explicit, we get lists back, if it's empty there was no
matching row.
+ if len(r) > 0:
+ print("FATAL: shouldn't get here")
+ sys.exit(-1)
+ print()
+
+ # scan all rows/columnNames
+ columnNames = []
+ for col, desc in client.getColumnDescriptors(demo_table).items():
+ print(f"column with name: {desc.name.decode()}")
+ print(desc)
+ columnNames.append(desc.name[:-1])
+
+ print("\nStarting scanner...")
+ scanner = client.scannerOpenWithStop(
+ demo_table, "00020".encode(), "00040".encode(), columnNames,
dummy_attributes
+ )
+ while True:
+ r = client.scannerGet(scanner)
+ if not r:
+ break
+ printRow(r)
+ print("Scanner finished...")
+ transport.close()
+
+
+if __name__ == "__main__":
+ if len(sys.argv) < 3:
+ print(f"usage: {__file__} <host> <port>")
+ sys.exit(1)
+
+ default_host = sys.argv[1]
+ default_port = sys.argv[2]
+
+ default_is_framed_transport = False
+ demo_client(default_host, default_port, default_is_framed_transport)