Copilot commented on code in PR #59828:
URL: https://github.com/apache/doris/pull/59828#discussion_r2686008034


##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -333,18 +334,42 @@ public static List<CreateTableCommand> 
generateCreateTableCmds(String targetDb,
         return createtblCmds;
     }
 
-    private static List<Column> getColumns(JdbcClient jdbcClient,
+    public static List<Column> getColumns(JdbcClient jdbcClient,
             String database,
             String table,
             List<String> primaryKeys) {
         List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
         columns.forEach(col -> {
+            Preconditions.checkArgument(!col.getType().isUnsupported(),
+                    "Unsupported column type, table:[%s], column:[%s]", table, 
col.getName());
+            if (col.getType().isVarchar()) {
+                // The length of varchar needs to be multiplied by 3.
+                int len = col.getType().getLength() * 3;
+                if (col.getType().getLength() * 3 > 
ScalarType.MAX_VARCHAR_LENGTH) {

Review Comment:
   The varchar length calculation is duplicated. Line 347 calculates `len = 
col.getType().getLength() * 3`, but then line 348 recalculates the same value 
`col.getType().getLength() * 3` instead of using the `len` variable. This 
should use `len > ScalarType.MAX_VARCHAR_LENGTH` for consistency and efficiency.
   ```suggestion
                   if (len > ScalarType.MAX_VARCHAR_LENGTH) {
   ```



##########
fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+    @Mock
+    private JdbcClient jdbcClient;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);

Review Comment:
   The test uses the deprecated MockitoAnnotations.initMocks(this) method. This 
method has been deprecated since Mockito 2.0 and should be replaced with 
MockitoAnnotations.openMocks(this). The deprecated method may be removed in 
future versions of Mockito.
   ```suggestion
           MockitoAnnotations.openMocks(this);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java:
##########
@@ -67,6 +68,8 @@ private Object updateOffset(CommitOffsetRequest 
offsetRequest) {
 
         StreamingInsertJob streamingJob = (StreamingInsertJob) job;
         try {
+            LOG.info("Committing offset for job {}, task {}, offset {}",
+                    offsetRequest.getJobId(), offsetRequest.getTaskId(), 
offsetRequest.toString());

Review Comment:
   The log statement is redundant. The third placeholder "offset {}" receives 
offsetRequest.toString() which will include all fields (jobId, taskId, offset, 
scannedRows, scannedBytes) due to the @ToString annotation. However, jobId and 
taskId are already logged separately in the first two placeholders. Consider 
changing to log only the offset field (offsetRequest.getOffset()) or additional 
fields that aren't already logged (scannedRows, scannedBytes), or adjust the 
placeholders to match what you want to log.
   ```suggestion
                       offsetRequest.getJobId(), offsetRequest.getTaskId(), 
offsetRequest.getOffset());
   ```



##########
fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+    @Mock
+    private JdbcClient jdbcClient;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testGetColumnsWithPrimaryKeySorting() throws Exception {
+        // Prepare test data
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id", "name");
+
+        // Create mock columns in random order
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("age", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("id", 
ScalarType.createType(PrimitiveType.BIGINT)));
+        mockColumns.add(new Column("email", 
ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+        mockColumns.add(new Column("address", 
ScalarType.createVarcharType(200)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify primary keys are at the front in correct order
+        Assert.assertEquals(5, result.size());
+        Assert.assertEquals("id", result.get(0).getName());
+        Assert.assertEquals("name", result.get(1).getName());
+        // Verify non-primary key columns follow
+        Assert.assertEquals("age", result.get(2).getName());
+        Assert.assertEquals("email", result.get(3).getName());
+        Assert.assertEquals("address", result.get(4).getName());
+    }

Review Comment:
   The test does not verify that varchar columns are also multiplied by 3 when 
they are part of primary keys. The test 'testGetColumnsWithPrimaryKeySorting' 
uses varchar columns (email, name, address) but only checks the ordering, not 
whether their lengths were properly multiplied by 3. Consider adding assertions 
to verify that varchar primary key columns also have their length multiplied by 
3.



##########
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java:
##########
@@ -333,18 +334,42 @@ public static List<CreateTableCommand> 
generateCreateTableCmds(String targetDb,
         return createtblCmds;
     }
 
-    private static List<Column> getColumns(JdbcClient jdbcClient,
+    public static List<Column> getColumns(JdbcClient jdbcClient,
             String database,
             String table,
             List<String> primaryKeys) {
         List<Column> columns = jdbcClient.getColumnsFromJdbc(database, table);
         columns.forEach(col -> {
+            Preconditions.checkArgument(!col.getType().isUnsupported(),
+                    "Unsupported column type, table:[%s], column:[%s]", table, 
col.getName());
+            if (col.getType().isVarchar()) {
+                // The length of varchar needs to be multiplied by 3.
+                int len = col.getType().getLength() * 3;
+                if (col.getType().getLength() * 3 > 
ScalarType.MAX_VARCHAR_LENGTH) {
+                    col.setType(ScalarType.createStringType());
+                } else {
+                    col.setType(ScalarType.createVarcharType(len));
+                }
+            }
+
             // string can not to be key
             if (primaryKeys.contains(col.getName())
                     && col.getDataType() == PrimitiveType.STRING) {
                 
col.setType(ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH));
             }
         });
+
+        // sort columns for primary keys
+        columns.sort(
+                Comparator
+                        .comparing((Column col) -> 
!primaryKeys.contains(col.getName()))
+                        .thenComparing(
+                                col -> primaryKeys.contains(col.getName())
+                                        ? primaryKeys.indexOf(col.getName())
+                                        : Integer.MAX_VALUE

Review Comment:
   The sorting comparator calls `primaryKeys.contains()` and 
`primaryKeys.indexOf()` multiple times for each comparison operation. Since 
`primaryKeys` is a List, both operations are O(n), making the overall sort 
O(n²log n) instead of O(n log n). Consider converting `primaryKeys` to a Set 
for contains checks and/or creating a Map for index lookups before sorting to 
improve performance, especially when there are many columns or primary keys.
   ```suggestion
           Map<String, Integer> primaryKeyOrder = new HashMap<>();
           for (int i = 0; i < primaryKeys.size(); i++) {
               primaryKeyOrder.put(primaryKeys.get(i), i);
           }
           columns.sort(
                   Comparator
                           .comparing((Column col) -> 
!primaryKeyOrder.containsKey(col.getName()))
                           .thenComparing(
                                   col -> 
primaryKeyOrder.getOrDefault(col.getName(), Integer.MAX_VALUE)
   ```



##########
fe/fe-core/src/test/java/org/apache/doris/job/util/StreamingJobUtilsTest.java:
##########
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.util;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.datasource.jdbc.client.JdbcClient;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class StreamingJobUtilsTest {
+
+    @Mock
+    private JdbcClient jdbcClient;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test
+    public void testGetColumnsWithPrimaryKeySorting() throws Exception {
+        // Prepare test data
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id", "name");
+
+        // Create mock columns in random order
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("age", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("id", 
ScalarType.createType(PrimitiveType.BIGINT)));
+        mockColumns.add(new Column("email", 
ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+        mockColumns.add(new Column("address", 
ScalarType.createVarcharType(200)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify primary keys are at the front in correct order
+        Assert.assertEquals(5, result.size());
+        Assert.assertEquals("id", result.get(0).getName());
+        Assert.assertEquals("name", result.get(1).getName());
+        // Verify non-primary key columns follow
+        Assert.assertEquals("age", result.get(2).getName());
+        Assert.assertEquals("email", result.get(3).getName());
+        Assert.assertEquals("address", result.get(4).getName());
+    }
+
+    @Test
+    public void testGetColumnsWithVarcharTypeConversion() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("id", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("short_name", 
ScalarType.createVarcharType(50)));
+        mockColumns.add(new Column("long_name", 
ScalarType.createVarcharType(20000)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify varchar length multiplication by 3
+        Column shortName = result.stream()
+                .filter(col -> col.getName().equals("short_name"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(shortName);
+        Assert.assertEquals(150, shortName.getType().getLength()); // 50 * 3
+
+        // Verify long varchar becomes STRING type
+        Column longName = result.stream()
+                .filter(col -> col.getName().equals("long_name"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(longName);
+        Assert.assertTrue(longName.getType().isStringType());
+    }
+
+    @Test
+    public void testGetColumnsWithStringTypeAsPrimaryKey() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("id");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("id", ScalarType.createStringType()));
+        mockColumns.add(new Column("name", ScalarType.createVarcharType(50)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify string type primary key is converted to varchar
+        Column idColumn = result.stream()
+                .filter(col -> col.getName().equals("id"))
+                .findFirst()
+                .orElse(null);
+        Assert.assertNotNull(idColumn);
+        Assert.assertTrue(idColumn.getType().isVarchar());
+        Assert.assertEquals(ScalarType.MAX_VARCHAR_LENGTH, 
idColumn.getType().getLength());
+    }
+
+    @Test
+    public void testGetColumnsWithEmptyPrimaryKeys() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = new ArrayList<>();
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("col1", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("col2", ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("col3", 
ScalarType.createType(PrimitiveType.BIGINT)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify columns maintain original order when no primary keys
+        Assert.assertEquals(3, result.size());
+        Assert.assertEquals("col1", result.get(0).getName());
+        Assert.assertEquals("col2", result.get(1).getName());
+        Assert.assertEquals("col3", result.get(2).getName());
+    }
+
+    @Test
+    public void testGetColumnsWithMultiplePrimaryKeys() throws Exception {
+        String database = "test_db";
+        String table = "test_table";
+        List<String> primaryKeys = Arrays.asList("pk3", "pk1", "pk2");
+
+        List<Column> mockColumns = new ArrayList<>();
+        mockColumns.add(new Column("data1", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("pk1", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("data2", 
ScalarType.createVarcharType(100)));
+        mockColumns.add(new Column("pk2", 
ScalarType.createType(PrimitiveType.BIGINT)));
+        mockColumns.add(new Column("pk3", 
ScalarType.createType(PrimitiveType.INT)));
+        mockColumns.add(new Column("data3", ScalarType.createVarcharType(50)));
+
+        
Mockito.when(jdbcClient.getColumnsFromJdbc(ArgumentMatchers.anyString(), 
ArgumentMatchers.anyString())).thenReturn(mockColumns);
+        List<Column> result = StreamingJobUtils.getColumns(jdbcClient, 
database, table, primaryKeys);
+
+        // Verify primary keys are sorted in the order defined in primaryKeys 
list
+        Assert.assertEquals(6, result.size());
+        Assert.assertEquals("pk3", result.get(0).getName());
+        Assert.assertEquals("pk1", result.get(1).getName());
+        Assert.assertEquals("pk2", result.get(2).getName());
+        // Verify non-primary keys follow
+        Assert.assertEquals("data1", result.get(3).getName());
+        Assert.assertEquals("data2", result.get(4).getName());
+        Assert.assertEquals("data3", result.get(5).getName());
+    }
+}

Review Comment:
   Missing test coverage for the unsupported column type validation. The code 
at lines 343-344 in StreamingJobUtils.java checks for unsupported column types 
and throws an exception, but there is no corresponding test case that verifies 
this behavior. Consider adding a test that validates the exception is thrown 
when an unsupported column type is encountered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to