wuwangben opened a new issue #2397:
URL: https://github.com/apache/iceberg/issues/2397


   ### env:
   flink version:1.12.2
   iceberg commit:21e1922a8ddb93a82388ea86a5f500d9f23885b3
   ### code:
   
   PartitionSpec spec = 
PartitionSpec.builderFor(schemaIceberg).identity("bucketCity").identity("partDay").identity("partHour").build();
   
   
       public static void query(String baseHdfsPath,StreamExecutionEnvironment 
bsEnv) throws Exception {
           Map<String, String> catalogProperties = Maps.newHashMap();
           catalogProperties.put("type", "iceberg");
           catalogProperties.put(FlinkCatalogFactory.ICEBERG_CATALOG_TYPE, 
"hive");
           catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, 
baseHdfsPath);
           
catalogProperties.put(CatalogProperties.FILE_IO_IMPL,"org.apache.iceberg.aws.s3.S3FileIO");
           
catalogProperties.put(AwsProperties.CLIENT_FACTORY,"com.cennavi.dataStore.s3.DidiS3ClinetFactory");
   
           Configuration hadoopConf = new Configuration();
           CatalogLoader catalogLoader = 
CatalogLoader.hive("hiveCataLogS3FIO",hadoopConf,catalogProperties);
   
           TableIdentifier tableid = 
TableIdentifier.of(Namespace.of("icebergS3FIO"),"clinkJsonS3FIO");
           TableLoader tableLoader = 
TableLoader.fromCatalog(catalogLoader,tableid);
   
           TableSchema flinkSchema = TableSchema.builder()
                   .field("bucketCity",DataTypes.INT())
                   .field("partDay",DataTypes.STRING())
                   .field("partHour",DataTypes.STRING())
                   .field("cityCode",DataTypes.STRING())
                   .field("baseTime", DataTypes.INT())
                   .field("deviceId", DataTypes.STRING())
                   .field("linkID", DataTypes.BIGINT())
                   .field("stopCarInfo", DataTypes.STRING())
                   .build();
           Expression filter =
                   Expressions.and(
                           Expressions.and(
                                   Expressions.equal("bucketCity", 110000),
                                   Expressions.equal("cityCode", "1100")
                           ),
                           Expressions.equal("baseTime",1617081280)
                   );
   
           DataStream<RowData> ds = FlinkSource.forRowData().env(bsEnv)
                   
.filters(Collections.singletonList(filter)).project(flinkSchema)
                   .tableLoader(tableLoader).streaming(false).build();
   
           ds.process(new ProcessFunction<RowData, Object>() {
               @Override
               public void processElement(RowData rowData, Context context, 
Collector<Object> collector) throws Exception {
                   
log.info("datainfo==="+rowData.getInt(0)+","+rowData.getString(1)+","+rowData.getString(2)+","+rowData.getString(3)+","+rowData.getInt(4));
               }
           });
           bsEnv.execute();
       }
   ### result
   
![image](https://user-images.githubusercontent.com/16200775/113094831-fad9fc80-9224-11eb-9f9c-97f68638fa6b.png)
   
   I want to get the result of basetime = 1617081280, but other values are 
actually returned, such as 1617081279**
   
   Is it a bug or is my usage wrong?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to