https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/filesystem.html>
Here is my code:
my datastream source:
```
public static class MySource implements SourceFunction<UserInfo>{
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247",
"67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979",
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7",
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceContext<UserInfo> sourceContext) throws Exception{
while (true){
String userid = userids[(int) (Math.random() * (userids.length -
1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}
@Override
public void cancel(){
}
}
public static class UserInfo implements java.io.Serializable{
private String userId;
private Double amount;
private Timestamp ts;
public String getUserId(){
return userId;
}
public void setUserId(String userId){
this.userId = userId;
}
public Double getAmount(){
return amount;
}
public void setAmount(Double amount){
this.amount = amount;
}
public Timestamp getTs(){
return ts;
}
public void setTs(Timestamp ts){
this.ts = ts;
}
}
```
flink code:
```
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(10000);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
//构造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
tEnv.createTemporaryView("users", dataStream);
String hiveSql = "CREATE external TABLE fs_table (\n" +
" user_id STRING,\n" +
" order_amount DOUBLE" +
") partitioned by (dt string,h string,m string) " +
"stored as ORC " +
"TBLPROPERTIES (\n" +
" 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
" 'sink.partition-commit.delay'='0s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore'" +
")";
tEnv.executeSql(hiveSql);
String insertSql = "SELECT * FROM users";
tEnv.executeSql(insertSql);
```
And this is my flink configuration:
```
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size 4096m
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
```
And the exception is: java.util.concurrent.completionException:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailable and request
to ResourceManager for new slot failed.
According the exception message, it means the resource is in sufficient, but
the hadoop resource is enough, memory is 300+g, cores is 72, and the usage rate
is lower about 30%.
I have tried increase the taskmanager slot at flink run command with `flink run
-ys`, but it is not effective.
Here is the environment:
flink version: 1.12.0
java: 1.8
Please check what’s the problem is, really appreciate it. Thanks.