I think it worth mentioning in the documentation of Hive read that it
cannot read a table that has more than 32,767 partitions.

On Thu, Mar 28, 2024 at 5:10 PM Xiaolong Wang <xiaolong.w...@smartnews.com>
wrote:

> Found out the reason:
>
> It turned out that in Flink, it uses hive’s IMetaStoreClient to fetch
> partitions using the following method:
>
>   List<String> listPartitionNames(String db_name, String tbl_name,
>     short max_parts) throws MetaException, TException;
>
> where the max_parts represents the max number of partitions it can fetch
> from the Hive metastore.
> So the max number of partitions it can fetch is Short.MAX_VALUE, which is
> 32767 .
>
> But the table has a way more partition number than the max value, thus the
> list partition operations cannot fetch all partitions, hence it cannot
> consume the recent partition.
>
> On Tue, Mar 26, 2024 at 5:00 PM Xiaolong Wang <xiaolong.w...@smartnews.com>
> wrote:
>
>> Hi,
>>
>> I found a weird bug when reading a Hive table as a streaming source.
>>
>> In summary, if the first partition is not time related, then the Hive
>> table cannot be read as a streaming source.
>>
>> e.g.
>>
>> I've a Hive table in the definition of
>>
>> ```
>> CREATE TABLE article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> USING orc;
>> ```
>> Then I try to query it as a streaming source:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' =
>> 'edition=en_US/dt=2024-03-26/hh=00') */
>> ```
>>
>> And I see no output in the `kafka_sink`.
>>
>> Then I defined an external table pointing to the same path but has no
>> `edition` partition,
>>
>> ```
>> CREATE TABLE en_article (
>> id BIGINT,
>> edition STRING,
>> dt STRING,
>> hh STRING
>> )
>> PARTITIONED BY (edition, dt, hh)
>> LOCATION 's3://xxx/article/edition=en_US'
>> USING orc;
>> ```
>>
>> And insert with the following statement:
>>
>> ```
>> INSERT INTO kafka_sink
>> SELECT id
>> FROM en_article /*+ OPTIONS('streaming-source.enable' = 'true',
>> 'streaming-source.partition-order' = 'partition-name',
>> 'streaming-source.consume-start-offset' = 'dt=2024-03-26/hh=00') */
>> ```
>>
>> The data is sinked.
>>
>>
>

Reply via email to